Sniff based on time interval, not number of requests
This commit is contained in:
+16
-10
@@ -1,4 +1,5 @@
|
||||
import re
|
||||
import time
|
||||
|
||||
from .connection import RequestsHttpConnection
|
||||
from .connection_pool import ConnectionPool
|
||||
@@ -33,7 +34,7 @@ class Transport(object):
|
||||
"""
|
||||
def __init__(self, hosts, connection_class=RequestsHttpConnection,
|
||||
connection_pool_class=ConnectionPool, host_info_callback=get_host_info,
|
||||
sniff_on_start=False, sniff_after_requests=None,
|
||||
sniff_on_start=False, sniffer_timeout=None,
|
||||
sniff_on_connection_fail=False, serializer=JSONSerializer(),
|
||||
max_retries=3, **kwargs):
|
||||
"""
|
||||
@@ -46,7 +47,7 @@ class Transport(object):
|
||||
producing a list of arguments (same as `hosts` parameter)
|
||||
:arg sniff_on_start: flag indicating whether to obtain a list of nodes
|
||||
from the cluser at startup time
|
||||
:arg sniff_after_requests: number of requests after which a sniffing should be initialized
|
||||
:arg sniffer_timeout: number of seconds between automatic sniffs
|
||||
:arg sniff_on_connection_fail: flasg controlling if connection failure triggers a sniff
|
||||
:arg serializer: serializer instance
|
||||
:arg max_retries: maximum number of retries before an exception is propagated
|
||||
@@ -73,9 +74,9 @@ class Transport(object):
|
||||
self.set_connections(hosts)
|
||||
|
||||
# sniffing data
|
||||
self.req_counter = 0
|
||||
self.sniff_after_requests = sniff_after_requests
|
||||
self.sniffer_timeout = sniffer_timeout
|
||||
self.sniff_on_connection_fail = sniff_on_connection_fail
|
||||
self.last_sniff = time.time()
|
||||
|
||||
# callback to construct host dict from data in /_cluster/nodes
|
||||
self.host_info_callback = host_info_callback
|
||||
@@ -116,10 +117,9 @@ class Transport(object):
|
||||
:arg sniffing: flag indicating that the connection will be used for
|
||||
sniffing for nodes
|
||||
"""
|
||||
if not sniffing and self.sniff_after_requests:
|
||||
if self.req_counter >= self.sniff_after_requests:
|
||||
if not sniffing and self.sniffer_timeout:
|
||||
if time.time() >= self.last_sniff + self.sniffer_timeout:
|
||||
self.sniff_hosts()
|
||||
self.req_counter += 1
|
||||
return self.connection_pool.get_connection()
|
||||
|
||||
def sniff_hosts(self, failure=False):
|
||||
@@ -132,9 +132,15 @@ class Transport(object):
|
||||
:arg failure: indicates whether this sniffing was initiated because of
|
||||
a connection failure
|
||||
"""
|
||||
# set the counter to 0 first so that other threads won't sniff as well
|
||||
self.req_counter = 0
|
||||
_, node_info = self.perform_request('GET', '/_cluster/nodes', sniffing=True)
|
||||
previous_sniff = self.last_sniff
|
||||
try:
|
||||
# reset last_sniff timestamp
|
||||
self.last_sniff = time.time()
|
||||
_, node_info = self.perform_request('GET', '/_cluster/nodes', sniffing=True)
|
||||
except:
|
||||
# keep the previous value on error
|
||||
self.last_sniff = previous_sniff
|
||||
raise
|
||||
|
||||
hosts = []
|
||||
address = self.connection_class.transport_schema + '_address'
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from unittest import TestCase
|
||||
|
||||
from elasticsearch.transport import Transport
|
||||
@@ -91,16 +92,18 @@ class TestTransport(TestCase):
|
||||
self.assertEquals(1, len(t.connection_pool.connections))
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
|
||||
def test_sniff_after_n_requests(self):
|
||||
def test_sniff_after_n_seconds(self):
|
||||
t = Transport([{"data": CLUSTER_NODES}],
|
||||
connection_class=DummyConnection, sniff_after_requests=5)
|
||||
connection_class=DummyConnection, sniffer_timeout=5)
|
||||
|
||||
for _ in range(4):
|
||||
t.perform_request('GET', '/')
|
||||
self.assertEquals(1, len(t.connection_pool.connections))
|
||||
self.assertIsInstance(t.get_connection()[0], DummyConnection)
|
||||
t.last_sniff = time.time() - 5.1
|
||||
|
||||
t.perform_request('GET', '/')
|
||||
self.assertEquals(1, len(t.connection_pool.connections))
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
self.assertTrue(time.time() - 1 < t.last_sniff < time.time() + 0.01 )
|
||||
|
||||
|
||||
Reference in New Issue
Block a user