Only trigger retry and mark connection as dead on connection errors
Not when we receive an error code from ES.
This commit is contained in:
@@ -3,7 +3,7 @@ import time
|
||||
import requests
|
||||
import json
|
||||
|
||||
from .exceptions import TransportError, HTTP_EXCEPTIONS
|
||||
from .exceptions import TransportError, HTTP_EXCEPTIONS, ConnectionError
|
||||
|
||||
logger = logging.getLogger('elasticsearch')
|
||||
tracer = logging.getLogger('elasticsearch.trace')
|
||||
@@ -61,9 +61,9 @@ class RequestsHttpConnection(Connection):
|
||||
response = self.session.send(request)
|
||||
duration = time.time() - start
|
||||
raw_data = response.text
|
||||
except requests.ConnectionError as e:
|
||||
except (requests.ConnectionError, requests.Timeout) as e:
|
||||
self.log_request_fail(method, request.url, time.time() - start, exception=e)
|
||||
raise TransportError(e)
|
||||
raise ConnectionError(e)
|
||||
|
||||
# raise errors based on http status codes, let the client handle those if needed
|
||||
if not (200 <= response.status_code < 300):
|
||||
|
||||
@@ -12,6 +12,10 @@ class TransportError(ElastiSearchException):
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionError(TransportError):
|
||||
pass
|
||||
|
||||
|
||||
class NotFoundError(TransportError):
|
||||
" 404 "
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ import re
|
||||
from .connection import RequestsHttpConnection
|
||||
from .connection_pool import ConnectionPool
|
||||
from .serializer import JSONSerializer
|
||||
from .exceptions import TransportError
|
||||
from .exceptions import ConnectionError
|
||||
|
||||
# get ip/port from "inet[wind/127.0.0.1:9200]"
|
||||
ADDRESS_RE = re.compile(r'/(?P<host>[^:]*):(?P<port>[0-9]+)\]')
|
||||
@@ -196,8 +196,7 @@ class Transport(object):
|
||||
|
||||
try:
|
||||
status, raw_data = connection.perform_request(method, url, params, body)
|
||||
except TransportError:
|
||||
# TODO: don't retry on client errors etc
|
||||
except ConnectionError:
|
||||
self.mark_dead(connection, dead_count + 1, sniffing)
|
||||
|
||||
# raise exception on last retry
|
||||
|
||||
@@ -2,7 +2,7 @@ from unittest import TestCase
|
||||
|
||||
from elasticsearch.transport import Transport
|
||||
from elasticsearch.connection import Connection
|
||||
from elasticsearch.exceptions import TransportError
|
||||
from elasticsearch.exceptions import TransportError, ConnectionError
|
||||
|
||||
class DummyConnection(Connection):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -58,15 +58,15 @@ class TestTransport(TestCase):
|
||||
self.assertEquals('http://google.com:9200', t.connection_pool.connections[1].host)
|
||||
|
||||
def test_request_will_fail_after_X_retries(self):
|
||||
t = Transport([{'exception': TransportError('abandon ship')}], connection_class=DummyConnection)
|
||||
t = Transport([{'exception': ConnectionError('abandon ship')}], connection_class=DummyConnection)
|
||||
|
||||
self.assertRaises(TransportError, t.perform_request, 'GET', '/')
|
||||
self.assertRaises(ConnectionError, t.perform_request, 'GET', '/')
|
||||
self.assertEquals(3, len(t.get_connection()[0].calls))
|
||||
|
||||
def test_failed_connection_will_be_marked_as_dead(self):
|
||||
t = Transport([{'exception': TransportError('abandon ship')}], connection_class=DummyConnection)
|
||||
t = Transport([{'exception': ConnectionError('abandon ship')}], connection_class=DummyConnection)
|
||||
|
||||
self.assertRaises(TransportError, t.perform_request, 'GET', '/')
|
||||
self.assertRaises(ConnectionError, t.perform_request, 'GET', '/')
|
||||
self.assertEquals(0, len(t.connection_pool.connections))
|
||||
|
||||
def test_resurrected_connection_will_be_marked_as_live_on_success(self):
|
||||
@@ -84,10 +84,10 @@ class TestTransport(TestCase):
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
|
||||
def test_sniff_on_fail_triggers_sniffing_on_fail(self):
|
||||
t = Transport([{'exception': TransportError('abandon ship')}, {"data": CLUSTER_NODES}],
|
||||
t = Transport([{'exception': ConnectionError('abandon ship')}, {"data": CLUSTER_NODES}],
|
||||
connection_class=DummyConnection, sniff_on_connection_fail=True, max_retries=1, randomize_hosts=False)
|
||||
|
||||
self.assertRaises(TransportError, t.perform_request, 'GET', '/')
|
||||
self.assertRaises(ConnectionError, t.perform_request, 'GET', '/')
|
||||
self.assertEquals(1, len(t.connection_pool.connections))
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
|
||||
@@ -105,11 +105,11 @@ class TestTransport(TestCase):
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
|
||||
def test_sniff_on_failure_shortens_sniff_after_n_requests(self):
|
||||
t = Transport([{'exception': TransportError('abandon ship')}, {"data": CLUSTER_NODES}],
|
||||
t = Transport([{'exception': ConnectionError('abandon ship')}, {"data": CLUSTER_NODES}],
|
||||
connection_class=DummyConnection, sniff_on_connection_fail=True, max_retries=1,
|
||||
randomize_hosts=False, sniff_after_requests=4)
|
||||
|
||||
self.assertRaises(TransportError, t.perform_request, 'GET', '/')
|
||||
self.assertRaises(ConnectionError, t.perform_request, 'GET', '/')
|
||||
self.assertEquals(1, len(t.connection_pool.connections))
|
||||
self.assertEquals('http://1.1.1.1:123', t.get_connection()[0].host)
|
||||
self.assertEquals(3, t.sniff_after_requests)
|
||||
|
||||
Reference in New Issue
Block a user