Commit e38e7bdf authored by Lukáš Lalinský's avatar Lukáš Lalinský

Reconnect on HTTP connection errors

parent 60404c29
......@@ -41,7 +41,7 @@ For example::
"""
def connect(url, **kwargs):
def connect(url, max_retries=None, **kwargs):
"""Connects to a Phoenix query server.
:param url:
......@@ -53,9 +53,12 @@ def connect(url, **kwargs):
:param readonly:
Switch the connection to readonly mode.
:param max_retries:
The maximum number of retries in case there is a connection error.
:returns:
:class:`~phoenixdb.connection.Connection` object.
"""
client = AvaticaClient(url)
client = AvaticaClient(url, max_retries=max_retries)
client.connect()
return Connection(client, **kwargs)
......@@ -19,8 +19,10 @@ import socket
import httplib
import pprint
import json
import math
import logging
import urlparse
import time
from decimal import Decimal
from HTMLParser import HTMLParser
from phoenixdb import errors
......@@ -105,7 +107,7 @@ class AvaticaClient(object):
to a server using :func:`phoenixdb.connect`.
"""
def __init__(self, url, version=None):
def __init__(self, url, version=None, max_retries=None):
"""Constructs a new client object.
:param url:
......@@ -126,6 +128,7 @@ class AvaticaClient(object):
self.version = AVATICA_1_3_0
elif v in ('1.4.0', '1.4'):
self.version = AVATICA_1_4_0
self.max_retries = max_retries if max_retries is not None else 3
self.connection = None
def connect(self):
......@@ -147,6 +150,33 @@ class AvaticaClient(object):
logger.warning("Error while closing connection", exc_info=True)
self.connection = None
def _post_request(self, body, headers):
retry_count = self.max_retries
while True:
logger.debug("POST %s %r %r", self.url.path, body, headers)
try:
self.connection.request('POST', self.url.path, body=body, headers=headers)
response = self.connection.getresponse()
except httplib.HTTPException as e:
if retry_count > 0:
delay = math.exp(-retry_count)
logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True)
self.close()
self.connect()
time.sleep(delay)
retry_count -= 1
continue
raise errors.InterfaceError('RPC request failed', cause=e)
else:
if response.status == httplib.SERVICE_UNAVAILABLE:
if retry_count > 0:
delay = math.exp(-retry_count)
logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True)
time.sleep(delay)
retry_count -= 1
continue
return response
def _apply(self, request_data, expected_response_type=None):
logger.debug("Sending request\n%s", pprint.pformat(request_data))
......@@ -169,13 +199,7 @@ class AvaticaClient(object):
body = None
headers = {'request': json.dumps(request_data, default=default)}
logger.debug("POST %s %r %r", self.url.path, body, headers)
try:
self.connection.request('POST', self.url.path, body=body, headers=headers)
response = self.connection.getresponse()
except httplib.HTTPException as e:
raise errors.InterfaceError('RPC request failed', cause=e)
response = self._post_request(body, headers)
response_body = response.read()
if response.status != httplib.OK:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment