Commit caaf500b authored by Lukáš Lalinský's avatar Lukáš Lalinský

Better error handling (by parsing the error page)

parent e4eed514
......@@ -14,6 +14,7 @@
"""Implementation of the JSON-over-HTTP RPC protocol used by Avatica."""
import re
import socket
import httplib
import pprint
......@@ -21,7 +22,7 @@ import json
import logging
import urlparse
from HTMLParser import HTMLParser
from phoenixdb.errors import OperationalError, InternalError
from phoenixdb import errors
__all__ = ['AvaticaClient']
......@@ -40,9 +41,7 @@ class JettyErrorPageParser(HTMLParser):
self.path.append(tag)
def handle_endtag(self, tag):
top_tag = self.path.pop()
if tag != top_tag:
raise Exception('mismatched tags')
self.path.pop()
def handle_data(self, data):
if len(self.path) > 2 and self.path[0] == 'html' and self.path[1] == 'body':
......@@ -62,6 +61,35 @@ def parse_url(url):
return url
# Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
SQLSTATE_ERROR_CLASSES = [
('08', errors.OperationalError), # Connection Exception
('22018', errors.IntegrityError), # Constraint violatioin.
('22', errors.DataError), # Data Exception
('23', errors.IntegrityError), # Constraint Violation
('24', errors.InternalError), # Invalid Cursor State
('25', errors.InternalError), # Invalid Transaction State
('42', errors.ProgrammingError), # Syntax Error or Access Rule Violation
('XLC', errors.OperationalError), # Execution exceptions
('INT', errors.InternalError), # Phoenix internal error
]
def parse_error_page(html):
parser = JettyErrorPageParser()
parser.feed(html)
if parser.title == ['HTTP ERROR: 500']:
message = ' '.join(parser.message).strip()
match = re.match(r'^([^ ]+): ERROR (\d+) \(([0-9A-Z]{5})\): (.*?)$', message)
if match is not None:
exception, code, sqlstate, message = match.groups()
code = int(code)
for prefix, error_class in SQLSTATE_ERROR_CLASSES:
if sqlstate.startswith(prefix):
raise error_class(message, code, sqlstate)
raise errors.InternalError(message)
class AvaticaClient(object):
"""Client for Avatica's RPC server.
......@@ -87,7 +115,7 @@ class AvaticaClient(object):
self.connection = httplib.HTTPConnection(self.url.hostname, self.url.port)
self.connection.connect()
except (httplib.HTTPException, socket.error) as e:
raise OperationalError('Unable to connect to the specified service', e)
raise errors.InterfaceError('Unable to connect to the specified service', e)
def close(self):
"""Closes the HTTP connection to the RPC server."""
......@@ -99,44 +127,38 @@ class AvaticaClient(object):
logger.warning("Error while closing connection", exc_info=True)
self.connection = None
def _parse_error_page(self, body):
parser = JettyErrorPageParser()
parser.feed(body)
if parser.title == ['HTTP ERROR: 500']:
raise OperationalError(' '.join(parser.message))
def _apply(self, request_data, expected_response_type=None):
logger.debug("Sending request\n%s", pprint.pformat(request_data))
try:
self.connection.request('POST', self.url.path, headers={'request': json.dumps(request_data)})
response = self.connection.getresponse()
except httplib.HTTPException as e:
raise OperationalError('RPC request failed', e)
raise errors.InterfaceError('RPC request failed', cause=e)
response_body = response.read()
if response.status != httplib.OK:
logger.debug("Received response\n%s", response_body)
if '<html>' in response_body:
self._parse_error_page(response_body)
raise OperationalError('RPC request returned invalid status code', response.status)
parse_error_page(response_body)
raise errors.InterfaceError('RPC request returned invalid status code', response.status)
try:
response_data = json.loads(response_body)
except ValueError as e:
logger.debug("Received response\n%s", response_body)
raise InternalError('valid JSON document', e)
raise errors.InterfaceError('valid JSON document', cause=e)
logger.debug("Received response\n%s", pprint.pformat(response_data))
if 'response' not in response_data:
raise InternalError('missing response type')
raise errors.InterfaceError('missing response type')
if expected_response_type is None:
expected_response_type = request_data['request']
if response_data['response'] != expected_response_type:
raise InternalError('unexpected response type', response_data['response'])
raise errors.InterfaceError('unexpected response type "{}"'.format(response_data['response']))
return response_data
......
......@@ -20,40 +20,68 @@ __all__ = [
class Warning(StandardError):
pass
"""Not used by this package, only defined for compatibility
with DB API 2.0."""
class Error(StandardError):
pass
"""Exception that is the base class of all other error exceptions.
You can use this to catch all errors with one single except statement."""
def __init__(self, message, code=None, sqlstate=None, cause=None):
super(StandardError, self).__init__(message, code, sqlstate, cause)
@property
def message(self):
return self.args[0]
@property
def code(self):
return self.args[1]
@property
def sqlstate(self):
return self.args[2]
@property
def cause(self):
return self.args[3]
class InterfaceError(Error):
pass
"""Exception raised for errors that are related to the database
interface rather than the database itself."""
class DatabaseError(Error):
pass
"""Exception raised for errors that are related to the database."""
class DataError(DatabaseError):
pass
"""Exception raised for errors that are due to problems with the
processed data like division by zero, numeric value out of range,
etc."""
class OperationalError(DatabaseError):
pass
"""Raised for errors that are related to the database's operation and not
necessarily under the control of the programmer, e.g. an unexpected
disconnect occurs, the data source name is not found, a transaction could
not be processed, a memory allocation error occurred during
processing, etc."""
class IntegrityError(DatabaseError):
pass
"""Raised when the relational integrity of the database is affected, e.g. a foreign key check fails."""
class InternalError(DatabaseError):
pass
"""Raised when the database encounters an internal problem."""
class ProgrammingError(DatabaseError):
pass
"""Raises for programming errors, e.g. table not found, syntax error, etc."""
class NotSupportedError(DatabaseError):
pass
"""Raised when using an API that is not supported by the database."""
import os
import unittest
import phoenixdb
TEST_DB_URL = os.environ.get('PHOENIXDB_TEST_DB_URL')
@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
class DatabaseTestCase(unittest.TestCase):
def setUp(self):
self.conn = phoenixdb.connect(TEST_DB_URL, autocommit=True)
self.cleanup_tables = []
def tearDown(self):
self.doCleanups()
self.conn.close()
def addTableCleanup(self, name):
def dropTable():
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS {}".format(name))
self.addCleanup(dropTable)
def createTable(self, name, columns):
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS {}".format(name))
cursor.execute("CREATE TABLE {} ({})".format(name, columns))
self.addTableCleanup(name)
......@@ -84,7 +84,7 @@ class PhoenixDatabaseAPI20Test(dbapi20.DatabaseAPI20Test):
# no rows
cur.execute('select name from %sbooze' % self.table_prefix)
self.assertRaises(StopIteration,cur.next)
_failUnless(self,cur.rowcount in (-1,0))
self.failUnless(cur.rowcount in (-1,0))
# cursor.next should raise an Error if called after
# executing a query that cannnot return rows
......@@ -103,6 +103,6 @@ class PhoenixDatabaseAPI20Test(dbapi20.DatabaseAPI20Test):
)
# cursor.next should raise StopIteration if no more rows available
self.assertRaises(StopIteration,cur.next)
_failUnless(self,cur.rowcount in (-1,1))
self.failUnless(cur.rowcount in (-1,1))
finally:
con.close()
from phoenixdb.tests import DatabaseTestCase
class ProgrammingErrorTest(DatabaseTestCase):
def test_invalid_sql(self):
with self.conn.cursor() as cursor:
with self.assertRaises(self.conn.ProgrammingError) as cm:
cursor.execute("UPS")
self.assertEqual("Syntax error. Encountered \"UPS\" at line 1, column 1.", cm.exception.message)
self.assertEqual(601, cm.exception.code)
self.assertEqual("42P00", cm.exception.sqlstate)
class IntegrityErrorTest(DatabaseTestCase):
def test_null_in_pk(self):
self.createTable("phoenixdb_test_tbl1", "id integer primary key")
with self.conn.cursor() as cursor:
with self.assertRaises(self.conn.IntegrityError) as cm:
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (NULL)")
self.assertEqual("Constraint violatioin. PHOENIXDB_TEST_TBL1.ID may not be null", cm.exception.message)
self.assertEqual(218, cm.exception.code)
self.assertEqual("22018", cm.exception.sqlstate)
class DataErrorTest(DatabaseTestCase):
def test_number_outside_of_range(self):
self.createTable("phoenixdb_test_tbl1", "id tinyint primary key")
with self.conn.cursor() as cursor:
with self.assertRaises(self.conn.DataError) as cm:
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (10000)")
self.assertEqual("Type mismatch. TINYINT and INTEGER for 10000", cm.exception.message)
self.assertEqual(203, cm.exception.code)
self.assertEqual("22005", cm.exception.sqlstate)
def test_division_by_zero(self):
self.createTable("phoenixdb_test_tbl1", "id integer primary key")
with self.conn.cursor() as cursor:
with self.assertRaises(self.conn.DataError) as cm:
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2/0)")
self.assertEqual("Divide by zero.", cm.exception.message)
self.assertEqual(202, cm.exception.code)
self.assertEqual("22012", cm.exception.sqlstate)
import unittest
import phoenixdb
from phoenixdb.tests import TEST_DB_URL
@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
class PhoenixTypesTest(unittest.TestCase):
def setUp(self):
self.conn = phoenixdb.connect(TEST_DB_URL, autocommit=True)
self.cleanup_tables = []
def tearDown(self):
self.doCleanups()
self.conn.close()
def addTableCleanup(self, name):
def dropTable():
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS {}".format(name))
self.addCleanup(dropTable)
def createTable(self, name, columns):
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS {}".format(name))
cursor.execute("CREATE TABLE {} ({})".format(name, columns))
self.addTableCleanup(name)
from phoenixdb.tests import DatabaseTestCase
class TypesTest(DatabaseTestCase):
def checkIntType(self, type_name, min_value, max_value):
self.createTable("phoenixdb_test_tbl1", "id integer primary key, val {}".format(type_name))
......@@ -37,8 +17,8 @@ class PhoenixTypesTest(unittest.TestCase):
cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER)
self.assertEqual(cursor.fetchall(), [[1, 1], [2, None], [3, 1], [4, None], [5, min_value], [6, max_value]])
self.assertRaises(phoenixdb.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1])
self.assertRaises(phoenixdb.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1])
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1])
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1])
def test_integer(self):
self.checkIntType("integer", -2147483648, 2147483647)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment