Simpler easier to use callback for extracting node information

This commit is contained in:
Honza Kral
2013-05-22 18:45:55 +02:00
parent 0dae1e9df1
commit fecd26193c
+34 -21
View File
@@ -8,22 +8,21 @@ from .exceptions import ConnectionError
# get ip/port from "inet[wind/127.0.0.1:9200]"
ADDRESS_RE = re.compile(r'/(?P<host>[^:]*):(?P<port>[0-9]+)\]')
def construct_hosts_list(nodes, transport):
"""
Simple callback that trasnforms the output of `/_cluster/nodes` to a format
accepted by :class:`~elasticsearch.Transport`.
:arg nodes: deserialized output of the API call
:arg transport: the transport schema used (usually 'http'). Used to locate
appropriate address in the node info.
def get_host_info(node_info, host):
"""
hosts = []
address = '%s_address' % transport
for n in nodes.values():
match = ADDRESS_RE.search(n.get(address, ''))
if match:
hosts.append(match.groupdict())
return hosts
Simple callback that takes the node info from `/_cluster/nodes` and a
parsed connection information and return the connection information. If
`None` is returned this node will be skipped.
Useful for filtering nodes (by proximity for example) or if additional
information needs to be provided for the :class:`~elasticsearch.Connection`
class.
:arg node_info: node information from `/_cluster/nodes`
:arg host: connection information (host, port) extracted from the node info
"""
return host
class Transport(object):
"""
@@ -33,7 +32,7 @@ class Transport(object):
Main interface is the `perform_request` method.
"""
def __init__(self, hosts, connection_class=RequestsHttpConnection,
connection_pool_class=ConnectionPool, nodes_to_host_callback=construct_hosts_list,
connection_pool_class=ConnectionPool, host_info_callback=get_host_info,
sniff_on_start=False, sniff_after_requests=None,
sniff_on_connection_fail=False, serializer=JSONSerializer(),
max_retries=3, **kwargs):
@@ -42,9 +41,9 @@ class Transport(object):
create a `connection_class` instance
:arg connection_class: subclass of :class:`~elasticsearch.Connection` to use
:arg connection_pool_class: subclass of :class:`~elasticsearch.ConnectionPool` to use
:arg nodes_to_host_callback: callback responsible for taking the output
of `/_cluser/nodes` and producing a list of arguments (same as `hosts`
parameter)
:arg host_info_callback: callback responsible for taking the node information from
`/_cluser/nodes`, along with already extracted information, and
producing a list of arguments (same as `hosts` parameter)
:arg sniff_on_start: flag indicating whether to obtain a list of nodes
from the cluser at startup time
:arg sniff_after_requests: number of requests after which a sniffing should be initialized
@@ -81,8 +80,8 @@ class Transport(object):
self.sniff_after_requests = sniff_after_requests
self.sniff_on_connection_fail = sniff_on_connection_fail
# callback to construct hosts dicts from /_cluster/nodes data
self.nodes_to_host_callback = nodes_to_host_callback
# callback to construct host dict from data in /_cluster/nodes
self.host_info_callback = host_info_callback
if sniff_on_start:
self.sniff_hosts()
@@ -139,7 +138,21 @@ class Transport(object):
# set the counter to 0 first so that other threads won't sniff as well
self.req_counter = 0
_, node_info = self.perform_request('GET', '/_cluster/nodes', sniffing=True)
hosts = self.nodes_to_host_callback(node_info['nodes'], self.connection_class.transport_schema)
hosts = []
address = self.connection_class.transport_schema + '_address'
for n in node_info['nodes'].values():
match = ADDRESS_RE.search(n.get(address, ''))
if not match:
continue
host = match.groupdict()
if 'port' in host:
host['port'] = int(host['port'])
host = self.host_info_callback(n, host)
if host is not None:
hosts.append(host)
self.set_connections(hosts)
# when sniffing due to failure, shorten the period between sniffs progressively