Commit 0bdd853b authored by Lukáš Lalinský's avatar Lukáš Lalinský

Compatibility with Avatica 1.4 plus a number of improvements coming with that

parent 696dde5e
......@@ -43,6 +43,4 @@ Known problems:
* "Transaction" support, i.e. non-autocommit mode. Needs support in the Avatica RPC server first. (`CALCITE-767 <https://issues.apache.org/jira/browse/CALCITE-767>`_)
* Proper exception handling, currently it tries to parse the HTML error page it receives from the server. (`CALCITE-645 <https://issues.apache.org/jira/browse/CALCITE-767>`_)
* Can't pass long string as parameters. (`CALCITE-780 <https://issues.apache.org/jira/browse/CALCITE-780>`_)
* Can't use BINARY(n) columns properly. You can insert non-binary data to a BINARY column, but you can't insert arbitrary binary data, just a set of characters that can be encoded in JSON strings. (`CALCITE-781 <https://issues.apache.org/jira/browse/CALCITE-781>`_)
* Can't use TIME/DATE columns. The server returns incomplete data and expects different format on input and output. (`discussion <http://mail-archives.apache.org/mod_mbox/phoenix-user/201506.mbox/%3CCAGUtLj8HDeq7chOSTz%3DVznB-v79%3DCmJ5%3Dt1N9Bbe4wE_m1%3D3zg%40mail.gmail.com%3E>`_)
......@@ -91,9 +91,9 @@ def parse_error_page(html):
raise errors.InternalError(message)
AVATICA_1_2_0_INCUBATING = (1, 2, 0, 'incubating')
AVATICA_1_3_0_INCUBATING = (1, 3, 0, 'incubating')
AVATICA_1_4_0_INCUBATING = (1, 4, 0, 'incubating')
AVATICA_1_2_0 = (1, 2, 0)
AVATICA_1_3_0 = (1, 3, 0)
AVATICA_1_4_0 = (1, 4, 0)
class AvaticaClient(object):
......@@ -114,7 +114,18 @@ class AvaticaClient(object):
Version of the Avarica RPC server.
"""
self.url = parse_url(url)
self.version = version or AVATICA_1_2_0_INCUBATING
if version is not None:
self.version = version
else:
self.version = AVATICA_1_2_0
query = urlparse.parse_qs(self.url.query)
for v in query.get('v', []):
if v in ('1.2.0', '1.2'):
self.version = AVATICA_1_2_0
elif v in ('1.3.0', '1.3'):
self.version = AVATICA_1_3_0
elif v in ('1.4.0', '1.4'):
self.version = AVATICA_1_4_0
self.connection = None
def connect(self):
......@@ -151,13 +162,14 @@ class AvaticaClient(object):
return FakeFloat(obj)
raise TypeError
if self.version >= AVATICA_1_4_0_INCUBATING:
if self.version >= AVATICA_1_4_0:
body = json.dumps(request_data, default=default)
headers = {'content-type': 'application/json'}
else:
body = None
headers = {'request': json.dumps(request_data, default=default)}
logger.debug("POST %s %r %r", self.url.path, body, headers)
try:
self.connection.request('POST', self.url.path, body=body, headers=headers)
response = self.connection.getresponse()
......@@ -328,7 +340,7 @@ class AvaticaClient(object):
'sql': sql,
'maxRowCount': maxRowCount,
}
if self.version >= AVATICA_1_4_0_INCUBATING:
if self.version >= AVATICA_1_4_0:
request['statementId'] = statementId
return self._apply(request, 'Service$ExecuteResponse')['results']
......@@ -356,8 +368,8 @@ class AvaticaClient(object):
'sql': sql,
'maxRowCount': maxRowCount,
}
if self.version >= AVATICA_1_4_0_INCUBATING:
request['statementId'] = statementId
#if self.version >= AVATICA_1_4_0:
# request['statementId'] = statementId
return self._apply(request)['statement']
def fetch(self, connectionId, statementId, parameterValues=None, offset=0, fetchMaxRowCount=-1):
......@@ -389,9 +401,13 @@ class AvaticaClient(object):
'request': 'fetch',
'connectionId': connectionId,
'statementId': statementId,
'parameterValues': parameterValues,
'offset': offset,
'fetchMaxRowCount': fetchMaxRowCount,
}
if self.version < AVATICA_1_3_0:
# XXX won't work for all types, but oh well...
request['parameterValues'] = [v['value'] for v in parameterValues]
else:
request['parameterValues'] = parameterValues
return self._apply(request)['frame']
......@@ -16,6 +16,7 @@ import logging
import collections
import base64
from decimal import Decimal
from phoenixdb.types import Binary
from phoenixdb.errors import OperationalError, NotSupportedError, ProgrammingError
__all__ = ['Cursor', 'ColumnDescription']
......@@ -51,7 +52,7 @@ class Cursor(object):
self._connection = connection
self._id = id
self._signature = None
self._data_types = []
self._column_data_types = []
self._frame = None
self._pos = None
self._closed = False
......@@ -92,7 +93,7 @@ class Cursor(object):
self._connection._client.closeStatement(self._connection._id, self._id)
self._id = None
self._signature = None
self._data_types = []
self._column_data_types = []
self._frame = None
self._pos = None
self._closed = True
......@@ -126,17 +127,53 @@ class Cursor(object):
def _set_signature(self, signature):
self._signature = signature
self._data_types = []
self._column_data_types = []
self._parameter_data_types = []
if signature is None:
return
identity = lambda value: value
for i, column in enumerate(signature['columns']):
if column['columnClassName'] == 'java.math.BigDecimal':
self._data_types.append((i, Decimal))
self._column_data_types.append((i, Decimal))
elif column['columnClassName'] == 'java.lang.Float' or column['columnClassName'] == 'java.lang.Double':
self._data_types.append((i, float))
self._column_data_types.append((i, float))
elif column['type']['name'] == 'BINARY':
self._data_types.append((i, base64.b64decode))
self._column_data_types.append((i, base64.b64decode))
for parameter in signature['parameters']:
if parameter['className'] == 'java.math.BigDecimal':
self._parameter_data_types.append(('NUMBER', None))
elif parameter['className'] == 'java.lang.Float':
self._parameter_data_types.append(('FLOAT', None))
elif parameter['className'] == 'java.lang.Double':
self._parameter_data_types.append(('DOUBLE', None))
elif parameter['className'] == 'java.lang.Long':
self._parameter_data_types.append(('LONG', None))
elif parameter['className'] == 'java.lang.Integer':
self._parameter_data_types.append(('INTEGER', None))
elif parameter['className'] == 'java.lang.Short':
self._parameter_data_types.append(('SHORT', None))
elif parameter['className'] == 'java.lang.Byte':
self._parameter_data_types.append(('BYTE', None))
elif parameter['className'] == 'java.lang.Boolean':
self._parameter_data_types.append(('BOOLEAN', None))
elif parameter['className'] == 'java.lang.String':
self._parameter_data_types.append(('STRING', None))
elif parameter['className'] == '[B':
self._parameter_data_types.append(('BYTE_STRING', Binary))
else:
self._parameter_data_types.append(('OBJECT', None))
print "parameters", self._parameter_data_types, signature
def _transform_parameters(self, parameters):
typed_parameters = []
for value, data_type in zip(parameters, self._parameter_data_types):
if value is None:
typed_parameters.append({'type': 'OBJECT', 'value': None})
else:
if data_type[1] is not None:
value = data_type[1](value)
typed_parameters.append({'type': data_type[0], 'value': value})
return typed_parameters
def _set_frame(self, frame):
self._frame = frame
......@@ -158,6 +195,8 @@ class Cursor(object):
raise ProgrammingError('the cursor is already closed')
self._updatecount = -1
if parameters is None:
if self._id is None:
self._set_id(self._connection._client.createStatement(self._connection._id))
results = self._connection._client.prepareAndExecute(self._connection._id, self._id,
operation, maxRowCount=self.itersize)
if results:
......@@ -173,7 +212,8 @@ class Cursor(object):
self._set_id(statement['id'])
self._set_signature(statement['signature'])
frame = self._connection._client.fetch(self._connection._id, self._id,
parameters, fetchMaxRowCount=self.itersize)
self._transform_parameters(parameters),
fetchMaxRowCount=self.itersize)
self._set_frame(frame)
def executemany(self, operation, seq_of_parameters):
......@@ -184,10 +224,11 @@ class Cursor(object):
statement = self._connection._client.prepare(self._connection._id, self._id,
operation, maxRowCount=0)
self._set_id(statement['id'])
self._signature = statement['signature']
self._set_signature(statement['signature'])
for parameters in seq_of_parameters:
self._connection._client.fetch(self._connection._id, self._id,
parameters, fetchMaxRowCount=0)
self._transform_parameters(parameters),
fetchMaxRowCount=0)
def fetchone(self):
if self._frame is None:
......@@ -201,7 +242,7 @@ class Cursor(object):
self._pos = None
if not self._frame['done']:
self._fetch_next_frame()
for i, data_type in self._data_types:
for i, data_type in self._column_data_types:
value = row[i]
if value is not None:
row[i] = data_type(value)
......
......@@ -2,6 +2,7 @@ import unittest
import phoenixdb
from decimal import Decimal
from phoenixdb.tests import DatabaseTestCase
from phoenixdb.avatica import AVATICA_1_2_0, AVATICA_1_3_0, AVATICA_1_4_0
class TypesTest(DatabaseTestCase):
......@@ -18,8 +19,10 @@ class TypesTest(DatabaseTestCase):
cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER)
self.assertEqual(cursor.fetchall(), [[1, 1], [2, None], [3, 1], [4, None], [5, min_value], [6, max_value]])
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1])
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1])
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(min_value - 1))
self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(max_value + 1))
#self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1])
#self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1])
def test_integer(self):
self.checkIntType("integer", -2147483648, 2147483647)
......@@ -36,7 +39,6 @@ class TypesTest(DatabaseTestCase):
def test_tinyint(self):
self.checkIntType("tinyint", -128, 127)
@unittest.skip("https://issues.apache.org/jira/browse/PHOENIX-2082")
def test_unsigned_tinyint(self):
self.checkIntType("unsigned_tinyint", 0, 127)
......@@ -130,7 +132,6 @@ class TypesTest(DatabaseTestCase):
cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'abc'], [2, None], [3, 'abc'], [4, None], [5, None], [6, None]])
@unittest.skip("https://issues.apache.org/jira/browse/CALCITE-780")
def test_varchar_very_long(self):
self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar")
with self.conn.cursor() as cursor:
......@@ -179,9 +180,9 @@ class TypesTest(DatabaseTestCase):
self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(2)")
with self.conn.cursor() as cursor:
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')")
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", ['ab'])
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [phoenixdb.Binary('ab')])
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, '\x01\x00')")
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", ['\x01\x00'])
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [phoenixdb.Binary('\x01\x00')])
cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
self.assertEqual(cursor.fetchall(), [
[1, 'ab'],
......@@ -190,12 +191,14 @@ class TypesTest(DatabaseTestCase):
[4, '\x01\x00'],
])
@unittest.skip("https://issues.apache.org/jira/browse/CALCITE-781")
def test_binary2(self):
self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(2)")
def test_binary_all_bytes(self):
if self.conn._client.version < AVATICA_1_4_0:
raise unittest.SkipTest('binary strings only work with Calcite >= 1.4.0')
self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(256)")
with self.conn.cursor() as cursor:
value = ''
for i in range(256):
value += chr(i)
cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ?)", [phoenixdb.Binary(value)])
self.assertEqual(cursor.fetchall(), [[1, value.encode('base64').strip()]])
cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, value]])
......@@ -14,6 +14,7 @@
import time
import datetime
import base64
__all__ = [
'Date', 'Time', 'Timestamp', 'DateFromTicks', 'TimeFromTicks', 'TimestampFromTicks',
......@@ -51,9 +52,15 @@ def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
def Binary(string):
def Binary(value):
"""Constructs an object capable of holding a binary (long) string value."""
return string
if isinstance(value, _BinaryString):
return value
return _BinaryString(base64.b64encode(value))
class _BinaryString(str):
pass
class ColumnType(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment