6f26eb3e8e
* remove unnecessary utf-8 header in .py files Signed-off-by: samuel orji <awesomeorji@gmail.com> * review feedback: add link to changelog Signed-off-by: samuel orji <awesomeorji@gmail.com> --------- Signed-off-by: samuel orji <awesomeorji@gmail.com>
507 lines
20 KiB
Python
507 lines
20 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
# The OpenSearch Contributors require contributions made to
|
|
# this file be licensed under the Apache-2.0 license or a
|
|
# compatible open source license.
|
|
#
|
|
# Modifications Copyright OpenSearch Contributors. See
|
|
# GitHub history for details.
|
|
#
|
|
# Licensed to Elasticsearch B.V. under one or more contributor
|
|
# license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright
|
|
# ownership. Elasticsearch B.V. licenses this file to you under
|
|
# the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
import time
|
|
from itertools import chain
|
|
from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union
|
|
|
|
from .connection import Connection, Urllib3HttpConnection
|
|
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
|
|
from .exceptions import (
|
|
ConnectionError,
|
|
ConnectionTimeout,
|
|
SerializationError,
|
|
TransportError,
|
|
)
|
|
from .serializer import DEFAULT_SERIALIZERS, Deserializer, JSONSerializer, Serializer
|
|
|
|
|
|
def get_host_info(
|
|
node_info: Dict[str, Any], host: Optional[Dict[str, Any]]
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Simple callback that takes the node info from `/_cluster/nodes` and a
|
|
parsed connection information and return the connection information. If
|
|
`None` is returned this node will be skipped.
|
|
|
|
Useful for filtering nodes (by proximity for example) or if additional
|
|
information needs to be provided for the :class:`~opensearchpy.Connection`
|
|
class. By default cluster_manager only nodes are filtered out since they shouldn't
|
|
typically be used for API operations.
|
|
|
|
:arg node_info: node information from `/_cluster/nodes`
|
|
:arg host: connection information (host, port) extracted from the node info
|
|
"""
|
|
# ignore cluster_manager only nodes
|
|
if node_info.get("roles", []) == ["cluster_manager"]:
|
|
return None
|
|
return host
|
|
|
|
|
|
class Transport(object):
|
|
"""
|
|
Encapsulation of transport-related to logic. Handles instantiation of the
|
|
individual connections as well as creating a connection pool to hold them.
|
|
|
|
Main interface is the `perform_request` method.
|
|
"""
|
|
|
|
DEFAULT_CONNECTION_CLASS: Type[Connection] = Urllib3HttpConnection
|
|
|
|
connection_pool: Any
|
|
deserializer: Deserializer
|
|
|
|
max_retries: int
|
|
retry_on_timeout: bool
|
|
retry_on_status: Collection[int]
|
|
send_get_body_as: str
|
|
serializer: Serializer
|
|
connection_pool_class: Any
|
|
connection_class: Type[Connection]
|
|
kwargs: Any
|
|
hosts: Any
|
|
seed_connections: List[Connection]
|
|
sniffer_timeout: Optional[float]
|
|
sniff_on_start: bool
|
|
sniff_on_connection_fail: bool
|
|
last_sniff: float
|
|
sniff_timeout: Optional[float]
|
|
host_info_callback: Any
|
|
|
|
def __init__(
|
|
self,
|
|
hosts: Any,
|
|
connection_class: Optional[Type[Connection]] = None,
|
|
connection_pool_class: Type[ConnectionPool] = ConnectionPool,
|
|
host_info_callback: Callable[
|
|
[Dict[str, Any], Optional[Dict[str, Any]]], Optional[Dict[str, Any]]
|
|
] = get_host_info,
|
|
sniff_on_start: bool = False,
|
|
sniffer_timeout: Optional[float] = None,
|
|
sniff_timeout: float = 0.1,
|
|
sniff_on_connection_fail: bool = False,
|
|
serializer: Serializer = JSONSerializer(),
|
|
serializers: Optional[Mapping[str, Serializer]] = None,
|
|
default_mimetype: str = "application/json",
|
|
max_retries: int = 3,
|
|
pool_maxsize: Optional[int] = None,
|
|
retry_on_status: Collection[int] = (502, 503, 504),
|
|
retry_on_timeout: bool = False,
|
|
send_get_body_as: str = "GET",
|
|
**kwargs: Any
|
|
) -> None:
|
|
"""
|
|
:arg hosts: list of dictionaries, each containing keyword arguments to
|
|
create a `connection_class` instance
|
|
:arg connection_class: subclass of :class:`~opensearchpy.Connection` to use
|
|
:arg connection_pool_class: subclass of :class:`~opensearchpy.ConnectionPool` to use
|
|
:arg host_info_callback: callback responsible for taking the node information from
|
|
`/_cluster/nodes`, along with already extracted information, and
|
|
producing a list of arguments (same as `hosts` parameter)
|
|
:arg sniff_on_start: flag indicating whether to obtain a list of nodes
|
|
from the cluster at startup time
|
|
:arg sniffer_timeout: number of seconds between automatic sniffs
|
|
:arg sniff_on_connection_fail: flag controlling if connection failure triggers a sniff
|
|
:arg sniff_timeout: timeout used for the sniff request - it should be a
|
|
fast api call and we are talking potentially to more nodes so we want
|
|
to fail quickly. Not used during initial sniffing (if
|
|
``sniff_on_start`` is on) when the connection still isn't
|
|
initialized.
|
|
:arg serializer: serializer instance
|
|
:arg serializers: optional dict of serializer instances that will be
|
|
used for deserializing data coming from the server. (key is the mimetype)
|
|
:arg default_mimetype: when no mimetype is specified by the server
|
|
response assume this mimetype, defaults to `'application/json'`
|
|
:arg max_retries: maximum number of retries before an exception is propagated
|
|
:arg retry_on_status: set of HTTP status codes on which we should retry
|
|
on a different node. defaults to ``(502, 503, 504)``
|
|
:arg retry_on_timeout: should timeout trigger a retry on different
|
|
node? (default `False`)
|
|
:arg send_get_body_as: for GET requests with body this option allows
|
|
you to specify an alternate way of execution for environments that
|
|
don't support passing bodies with GET requests. If you set this to
|
|
'POST' a POST method will be used instead, if to 'source' then the body
|
|
will be serialized and passed as a query parameter `source`.
|
|
:arg pool_maxsize: Maximum connection pool size used by pool-manager
|
|
For custom connection-pooling on current session
|
|
|
|
Any extra keyword arguments will be passed to the `connection_class`
|
|
when creating and instance unless overridden by that connection's
|
|
options provided as part of the hosts parameter.
|
|
"""
|
|
if connection_class is None:
|
|
connection_class = self.DEFAULT_CONNECTION_CLASS
|
|
|
|
# serialization config
|
|
_serializers = DEFAULT_SERIALIZERS.copy()
|
|
# if a serializer has been specified, use it for deserialization as well
|
|
_serializers[serializer.mimetype] = serializer
|
|
# if custom serializers map has been supplied, override the defaults with it
|
|
if serializers:
|
|
_serializers.update(serializers)
|
|
# create a deserializer with our config
|
|
self.deserializer = Deserializer(_serializers, default_mimetype)
|
|
|
|
self.max_retries = max_retries
|
|
self.pool_maxsize = pool_maxsize
|
|
self.retry_on_timeout = retry_on_timeout
|
|
self.retry_on_status = retry_on_status
|
|
self.send_get_body_as = send_get_body_as
|
|
|
|
# data serializer
|
|
self.serializer = serializer
|
|
|
|
# store all strategies...
|
|
self.connection_pool_class = connection_pool_class
|
|
self.connection_class = connection_class
|
|
|
|
# ...save kwargs to be passed to the connections
|
|
self.kwargs = kwargs
|
|
self.hosts = hosts
|
|
|
|
# Start with an empty pool specifically for `AsyncTransport`.
|
|
# It should never be used, will be replaced on first call to
|
|
# .set_connections()
|
|
self.connection_pool = EmptyConnectionPool()
|
|
|
|
if hosts:
|
|
# ...and instantiate them
|
|
self.set_connections(hosts)
|
|
# retain the original connection instances for sniffing
|
|
self.seed_connections = list(self.connection_pool.connections[:])
|
|
else:
|
|
self.seed_connections = []
|
|
|
|
# sniffing data
|
|
self.sniffer_timeout = sniffer_timeout
|
|
self.sniff_on_start = sniff_on_start
|
|
self.sniff_on_connection_fail = sniff_on_connection_fail
|
|
self.last_sniff = time.time()
|
|
self.sniff_timeout = sniff_timeout
|
|
|
|
# callback to construct host dict from data in /_cluster/nodes
|
|
self.host_info_callback = host_info_callback
|
|
|
|
if sniff_on_start:
|
|
self.sniff_hosts(True)
|
|
|
|
def add_connection(self, host: Any) -> None:
|
|
"""
|
|
Create a new :class:`~opensearchpy.Connection` instance and add it to the pool.
|
|
|
|
:arg host: kwargs that will be used to create the instance
|
|
"""
|
|
self.hosts.append(host)
|
|
self.set_connections(self.hosts)
|
|
|
|
def set_connections(self, hosts: Any) -> None:
|
|
"""
|
|
Instantiate all the connections and create new connection pool to hold them.
|
|
Tries to identify unchanged hosts and re-use existing
|
|
:class:`~opensearchpy.Connection` instances.
|
|
|
|
:arg hosts: same as `__init__`
|
|
"""
|
|
|
|
# construct the connections
|
|
def _create_connection(host: Any) -> Any:
|
|
# if this is not the initial setup look at the existing connection
|
|
# options and identify connections that haven't changed and can be
|
|
# kept around.
|
|
if hasattr(self, "connection_pool"):
|
|
for connection, old_host in self.connection_pool.connection_opts:
|
|
if old_host == host:
|
|
return connection
|
|
|
|
# previously unseen params, create new connection
|
|
kwargs = self.kwargs.copy()
|
|
kwargs.update(host)
|
|
if self.pool_maxsize and isinstance(self.pool_maxsize, int):
|
|
kwargs["pool_maxsize"] = self.pool_maxsize
|
|
return self.connection_class(**kwargs)
|
|
|
|
connections = list(zip(map(_create_connection, hosts), hosts))
|
|
if len(connections) == 1:
|
|
self.connection_pool = DummyConnectionPool(connections)
|
|
else:
|
|
# pass the hosts dicts to the connection pool to optionally extract parameters from
|
|
self.connection_pool = self.connection_pool_class(
|
|
connections, **self.kwargs
|
|
)
|
|
|
|
def get_connection(self) -> Any:
|
|
"""
|
|
Retrieve a :class:`~opensearchpy.Connection` instance from the
|
|
:class:`~opensearchpy.ConnectionPool` instance.
|
|
"""
|
|
if self.sniffer_timeout:
|
|
if time.time() >= self.last_sniff + self.sniffer_timeout:
|
|
self.sniff_hosts()
|
|
return self.connection_pool.get_connection()
|
|
|
|
def _get_sniff_data(self, initial: bool = False) -> Any:
|
|
"""
|
|
Perform the request to get sniffing information. Returns a list of
|
|
dictionaries (one per node) containing all the information from the
|
|
cluster.
|
|
|
|
It also sets the last_sniff attribute in case of a successful attempt.
|
|
|
|
In rare cases it might be possible to override this method in your
|
|
custom Transport class to serve data from alternative source like
|
|
configuration management.
|
|
"""
|
|
previous_sniff = self.last_sniff
|
|
|
|
try:
|
|
# reset last_sniff timestamp
|
|
self.last_sniff = time.time()
|
|
# go through all current connections as well as the
|
|
# seed_connections for good measure
|
|
for c in chain(self.connection_pool.connections, self.seed_connections):
|
|
try:
|
|
# use small timeout for the sniffing request, should be a fast api call
|
|
_, headers, node_info = c.perform_request(
|
|
"GET",
|
|
"/_nodes/_all/http",
|
|
timeout=self.sniff_timeout if not initial else None,
|
|
)
|
|
|
|
# Lowercase all the header names for consistency in accessing them.
|
|
headers = {
|
|
header.lower(): value for header, value in headers.items()
|
|
}
|
|
|
|
node_info = self.deserializer.loads(
|
|
node_info, headers.get("content-type")
|
|
)
|
|
break
|
|
except (ConnectionError, SerializationError):
|
|
pass
|
|
else:
|
|
raise TransportError("N/A", "Unable to sniff hosts.")
|
|
except Exception:
|
|
# keep the previous value on error
|
|
self.last_sniff = previous_sniff
|
|
raise
|
|
|
|
return list(node_info["nodes"].values())
|
|
|
|
def _get_host_info(self, host_info: Any) -> Any:
|
|
host = {}
|
|
address = host_info.get("http", {}).get("publish_address")
|
|
|
|
# malformed or no address given
|
|
if not address or ":" not in address:
|
|
return None
|
|
|
|
if "/" in address:
|
|
# Support 7.x host/ip:port behavior where http.publish_host has been set.
|
|
fqdn, ipaddress = address.split("/", 1)
|
|
host["host"] = fqdn
|
|
_, host["port"] = ipaddress.rsplit(":", 1)
|
|
host["port"] = int(host["port"])
|
|
|
|
else:
|
|
host["host"], host["port"] = address.rsplit(":", 1)
|
|
host["port"] = int(host["port"])
|
|
|
|
return self.host_info_callback(host_info, host)
|
|
|
|
def sniff_hosts(self, initial: bool = False) -> Any:
|
|
"""
|
|
Obtain a list of nodes from the cluster and create a new connection
|
|
pool using the information retrieved.
|
|
|
|
To extract the node connection parameters use the ``nodes_to_host_callback``.
|
|
|
|
:arg initial: flag indicating if this is during startup
|
|
(``sniff_on_start``), ignore the ``sniff_timeout`` if ``True``
|
|
"""
|
|
node_info = self._get_sniff_data(initial)
|
|
|
|
hosts: Any = list(filter(None, (self._get_host_info(n) for n in node_info)))
|
|
|
|
# we weren't able to get any nodes or host_info_callback blocked all -
|
|
# raise error.
|
|
if not hosts:
|
|
raise TransportError(
|
|
"N/A", "Unable to sniff hosts - no viable hosts found."
|
|
)
|
|
|
|
self.set_connections(hosts)
|
|
|
|
def mark_dead(self, connection: Connection) -> None:
|
|
"""
|
|
Mark a connection as dead (failed) in the connection pool. If sniffing
|
|
on failure is enabled this will initiate the sniffing process.
|
|
|
|
:arg connection: instance of :class:`~opensearchpy.Connection` that failed
|
|
"""
|
|
# mark as dead even when sniffing to avoid hitting this host during the sniff process
|
|
self.connection_pool.mark_dead(connection)
|
|
if self.sniff_on_connection_fail:
|
|
self.sniff_hosts()
|
|
|
|
def perform_request(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
body: Any = None,
|
|
timeout: Optional[Union[int, float]] = None,
|
|
ignore: Collection[int] = (),
|
|
headers: Optional[Mapping[str, str]] = None,
|
|
) -> Any:
|
|
"""
|
|
Perform the actual request. Retrieve a connection from the connection
|
|
pool, pass all the information to its perform_request method and
|
|
return the data.
|
|
|
|
If an exception was raised, mark the connection as failed and retry (up
|
|
to `max_retries` times).
|
|
|
|
If the operation was successful and the connection used was previously
|
|
marked as dead, mark it as live, resetting its failure count.
|
|
|
|
:arg method: HTTP method to use
|
|
:arg url: absolute url (without host) to target
|
|
:arg headers: dictionary of headers, will be handed over to the
|
|
underlying :class:`~opensearchpy.Connection` class
|
|
:arg params: dictionary of query parameters, will be handed over to the
|
|
underlying :class:`~opensearchpy.Connection` class for serialization
|
|
:arg body: body of the request, will be serialized using serializer and
|
|
passed to the connection
|
|
"""
|
|
method, params, body, ignore, timeout = self._resolve_request_args(
|
|
method, params, body
|
|
)
|
|
|
|
for attempt in range(self.max_retries + 1):
|
|
connection = self.get_connection()
|
|
|
|
try:
|
|
status, headers_response, data = connection.perform_request(
|
|
method,
|
|
url,
|
|
params,
|
|
body,
|
|
headers=headers,
|
|
ignore=ignore,
|
|
timeout=timeout,
|
|
)
|
|
|
|
# Lowercase all the header names for consistency in accessing them.
|
|
headers_response = {
|
|
header.lower(): value for header, value in headers_response.items()
|
|
}
|
|
|
|
except TransportError as e:
|
|
if method == "HEAD" and e.status_code == 404:
|
|
return False
|
|
|
|
retry = False
|
|
if isinstance(e, ConnectionTimeout):
|
|
retry = self.retry_on_timeout
|
|
elif isinstance(e, ConnectionError):
|
|
retry = True
|
|
elif e.status_code in self.retry_on_status:
|
|
retry = True
|
|
|
|
if retry:
|
|
try:
|
|
# only mark as dead if we are retrying
|
|
self.mark_dead(connection)
|
|
except TransportError:
|
|
# If sniffing on failure, it could fail too. Catch the
|
|
# exception not to interrupt the retries.
|
|
pass
|
|
# raise exception on last retry
|
|
if attempt == self.max_retries:
|
|
raise e
|
|
else:
|
|
raise e
|
|
|
|
else:
|
|
# connection didn't fail, confirm its live status
|
|
self.connection_pool.mark_live(connection)
|
|
|
|
if method == "HEAD":
|
|
return 200 <= status < 300
|
|
|
|
if data:
|
|
data = self.deserializer.loads(
|
|
data, headers_response.get("content-type")
|
|
)
|
|
return data
|
|
|
|
def close(self) -> Any:
|
|
"""
|
|
Explicitly closes connections
|
|
"""
|
|
return self.connection_pool.close()
|
|
|
|
def _resolve_request_args(self, method: str, params: Any, body: Any) -> Any:
|
|
"""Resolves parameters for .perform_request()"""
|
|
if body is not None:
|
|
body = self.serializer.dumps(body)
|
|
|
|
# some clients or environments don't support sending GET with body
|
|
if method in ("HEAD", "GET") and self.send_get_body_as != "GET":
|
|
# send it as post instead
|
|
if self.send_get_body_as == "POST":
|
|
method = "POST"
|
|
|
|
# or as source parameter
|
|
elif self.send_get_body_as == "source":
|
|
if params is None:
|
|
params = {}
|
|
params["source"] = body
|
|
body = None
|
|
|
|
if body is not None:
|
|
try:
|
|
body = body.encode("utf-8", "surrogatepass")
|
|
except (UnicodeDecodeError, AttributeError):
|
|
# bytes/str - no need to re-encode
|
|
pass
|
|
|
|
ignore = ()
|
|
timeout = None
|
|
if params:
|
|
timeout = params.pop("request_timeout", None)
|
|
if not timeout:
|
|
timeout = params.pop("timeout", None)
|
|
ignore = params.pop("ignore", ())
|
|
if isinstance(ignore, int):
|
|
ignore = (ignore,)
|
|
|
|
return method, params, body, ignore, timeout
|
|
|
|
|
|
__all__ = ["TransportError"]
|