avatica.py 17.8 KB
Newer Older
Lukáš Lalinský's avatar
Lukáš Lalinský committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright 2015 Lukas Lalinsky
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lukáš Lalinský's avatar
Lukáš Lalinský committed
15 16
"""Implementation of the JSON-over-HTTP RPC protocol used by Avatica."""

17
import re
Lukáš Lalinský's avatar
Lukáš Lalinský committed
18
import socket
Lukáš Lalinský's avatar
Lukáš Lalinský committed
19
import pprint
20
import math
Lukáš Lalinský's avatar
Lukáš Lalinský committed
21
import logging
22
import time
23
from phoenixdb import errors
24
from phoenixdb.calcite import requests_pb2, common_pb2, responses_pb2
Lukáš Lalinský's avatar
Lukáš Lalinský committed
25

Lukáš Lalinský's avatar
Lukáš Lalinský committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
try:
    import httplib
except ImportError:
    import http.client as httplib

try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse

try:
    from HTMLParser import HTMLParser
except ImportError:
    from html.parser import HTMLParser

Lukáš Lalinský's avatar
Lukáš Lalinský committed
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
__all__ = ['AvaticaClient']

logger = logging.getLogger(__name__)


class JettyErrorPageParser(HTMLParser):

    def __init__(self):
        HTMLParser.__init__(self)
        self.path = []
        self.title = []
        self.message = []

    def handle_starttag(self, tag, attrs):
        self.path.append(tag)

    def handle_endtag(self, tag):
58
        self.path.pop()
Lukáš Lalinský's avatar
Lukáš Lalinský committed
59 60 61 62 63 64 65 66 67

    def handle_data(self, data):
        if len(self.path) > 2 and self.path[0] == 'html' and self.path[1] == 'body':
            if len(self.path) == 3 and self.path[2] == 'h2':
                self.title.append(data.strip())
            elif len(self.path) == 4 and self.path[2] == 'p' and self.path[3] == 'pre':
                self.message.append(data.strip())


68 69 70 71 72 73 74 75 76 77
def parse_url(url):
    url = urlparse.urlparse(url)
    if not url.scheme and not url.netloc and url.path:
        netloc = url.path
        if ':' not in netloc:
            netloc = '{}:8765'.format(netloc)
        return urlparse.ParseResult('http', netloc, '/', '', '', '')
    return url


78 79 80 81 82 83 84 85 86 87 88 89 90
# Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
SQLSTATE_ERROR_CLASSES = [
    ('08', errors.OperationalError), # Connection Exception
    ('22018', errors.IntegrityError), # Constraint violatioin.
    ('22', errors.DataError), # Data Exception
    ('23', errors.IntegrityError), # Constraint Violation
    ('24', errors.InternalError), # Invalid Cursor State
    ('25', errors.InternalError), # Invalid Transaction State
    ('42', errors.ProgrammingError), # Syntax Error or Access Rule Violation
    ('XLC', errors.OperationalError), # Execution exceptions
    ('INT', errors.InternalError), # Phoenix internal error
]

91 92 93 94 95 96
# Relevant properties as defined by https://calcite.apache.org/avatica/docs/client_reference.html
OPEN_CONNECTION_PROPERTIES = (
    'user', # User for the database connection
    'password', # Password for the user
)

97

98 99 100 101 102 103 104
def raise_sql_error(code, sqlstate, message):
    for prefix, error_class in SQLSTATE_ERROR_CLASSES:
        if sqlstate.startswith(prefix):
            raise error_class(message, code, sqlstate)


def parse_and_raise_sql_error(message):
105 106 107
    match = re.findall(r'(?:([^ ]+): )?ERROR (\d+) \(([0-9A-Z]{5})\): (.*?) ->', message)
    if match is not None and len(match):
        exception, code, sqlstate, message = match[0]
108 109 110
        raise_sql_error(int(code), sqlstate, message)


111 112 113 114 115
def parse_error_page(html):
    parser = JettyErrorPageParser()
    parser.feed(html)
    if parser.title == ['HTTP ERROR: 500']:
        message = ' '.join(parser.message).strip()
116 117 118 119
        parse_and_raise_sql_error(message)
        raise errors.InternalError(message)


120 121 122 123
def parse_error_protobuf(text):
    message = common_pb2.WireMessage()
    message.ParseFromString(text)

124 125 126 127 128 129
    err = responses_pb2.ErrorResponse()
    err.ParseFromString(message.wrapped_message)

    parse_and_raise_sql_error(err.error_message)
    raise_sql_error(err.error_code, err.sql_state, err.error_message)
    raise errors.InternalError(err.error_message)
130 131


Lukáš Lalinský's avatar
Lukáš Lalinský committed
132 133 134 135 136 137 138 139 140
class AvaticaClient(object):
    """Client for Avatica's RPC server.

    This exposes all low-level functionality that the Avatica
    server provides, using the native terminology. You most likely
    do not want to use this class directly, but rather get connect
    to a server using :func:`phoenixdb.connect`.
    """

141
    def __init__(self, url, max_retries=None):
Lukáš Lalinský's avatar
Lukáš Lalinský committed
142
        """Constructs a new client object.
143

Lukáš Lalinský's avatar
Lukáš Lalinský committed
144 145 146
        :param url:
            URL of an Avatica RPC server.
        """
147
        self.url = parse_url(url)
148
        self.max_retries = max_retries if max_retries is not None else 3
Lukáš Lalinský's avatar
Lukáš Lalinský committed
149 150 151 152 153 154 155 156
        self.connection = None

    def connect(self):
        """Opens a HTTP connection to the RPC server."""
        logger.debug("Opening connection to %s:%s", self.url.hostname, self.url.port)
        try:
            self.connection = httplib.HTTPConnection(self.url.hostname, self.url.port)
            self.connection.connect()
Lukáš Lalinský's avatar
Lukáš Lalinský committed
157
        except (httplib.HTTPException, socket.error) as e:
158
            raise errors.InterfaceError('Unable to connect to the specified service', e)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
159 160 161 162 163 164 165 166 167 168 169

    def close(self):
        """Closes the HTTP connection to the RPC server."""
        if self.connection is not None:
            logger.debug("Closing connection to %s:%s", self.url.hostname, self.url.port)
            try:
                self.connection.close()
            except httplib.HTTPException as e:
                logger.warning("Error while closing connection", exc_info=True)
            self.connection = None

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
    def _post_request(self, body, headers):
        retry_count = self.max_retries
        while True:
            logger.debug("POST %s %r %r", self.url.path, body, headers)
            try:
                self.connection.request('POST', self.url.path, body=body, headers=headers)
                response = self.connection.getresponse()
            except httplib.HTTPException as e:
                if retry_count > 0:
                    delay = math.exp(-retry_count)
                    logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True)
                    self.close()
                    self.connect()
                    time.sleep(delay)
                    retry_count -= 1
                    continue
                raise errors.InterfaceError('RPC request failed', cause=e)
            else:
                if response.status == httplib.SERVICE_UNAVAILABLE:
                    if retry_count > 0:
                        delay = math.exp(-retry_count)
                        logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True)
                        time.sleep(delay)
                        retry_count -= 1
                        continue
                return response

Lukáš Lalinský's avatar
Lukáš Lalinský committed
197 198
    def _apply(self, request_data, expected_response_type=None):
        logger.debug("Sending request\n%s", pprint.pformat(request_data))
199 200

        request_name = request_data.__class__.__name__
201
        message = common_pb2.WireMessage()
202
        message.name = 'org.apache.calcite.avatica.proto.Requests${}'.format(request_name)
203 204 205
        message.wrapped_message = request_data.SerializeToString()
        body = message.SerializeToString()
        headers = {'content-type': 'application/x-google-protobuf'}
206

207
        response = self._post_request(body, headers)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
208 209 210 211
        response_body = response.read()

        if response.status != httplib.OK:
            logger.debug("Received response\n%s", response_body)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
212
            if b'<html>' in response_body:
213
                parse_error_page(response_body)
214
            else:
215
                # assume the response is in protobuf format
216
                parse_error_protobuf(response_body)
217
            raise errors.InterfaceError('RPC request returned invalid status code', response.status)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
218

219 220
        message = common_pb2.WireMessage()
        message.ParseFromString(response_body)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
221

Lukáš Lalinský's avatar
Lukáš Lalinský committed
222
        logger.debug("Received response\n%s", message)
Lukáš Lalinský's avatar
Lukáš Lalinský committed
223

224 225 226 227 228 229
        if expected_response_type is None:
            expected_response_type = request_name.replace('Request', 'Response')

        expected_response_type = 'org.apache.calcite.avatica.proto.Responses$' + expected_response_type
        if message.name != expected_response_type:
            raise errors.InterfaceError('unexpected response type "{}"'.format(message.name))
Lukáš Lalinský's avatar
Lukáš Lalinský committed
230

231
        return message.wrapped_message
Lukáš Lalinský's avatar
Lukáš Lalinský committed
232

233 234 235
    def getCatalogs(self, connectionId):
        request = requests_pb2.CatalogsRequest()
        request.connection_id = connectionId
Lukáš Lalinský's avatar
Lukáš Lalinský committed
236 237
        return self._apply(request)

238
    def getSchemas(self, connectionId, catalog=None, schemaPattern=None):
239
        request = requests_pb2.SchemasRequest()
240 241 242 243 244
        request.connection_id = connectionId
        if catalog is not None:
            request.catalog = catalog
        if schemaPattern is not None:
            request.schema_pattern = schemaPattern
Lukáš Lalinský's avatar
Lukáš Lalinský committed
245 246
        return self._apply(request)

247
    def getTables(self, connectionId, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
248
        request = requests_pb2.TablesRequest()
249 250 251 252 253 254 255 256 257 258 259 260
        request.connection_id = connectionId
        if catalog is not None:
            request.catalog = catalog
        if schemaPattern is not None:
            request.schema_pattern = schemaPattern
        if tableNamePattern is not None:
            request.table_name_pattern = tableNamePattern
        if typeList is not None:
            request.type_list = typeList
        if typeList is not None:
            request.type_list.extend(typeList)
        request.has_type_list = typeList is not None
Lukáš Lalinský's avatar
Lukáš Lalinský committed
261 262
        return self._apply(request)

263 264 265 266 267 268 269 270 271 272 273
    def getColumns(self, connectionId, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
        request = requests_pb2.ColumnsRequest()
        request.connection_id = connectionId
        if catalog is not None:
            request.catalog = catalog
        if schemaPattern is not None:
            request.schema_pattern = schemaPattern
        if tableNamePattern is not None:
            request.table_name_pattern = tableNamePattern
        if columnNamePattern is not None:
            request.column_name_pattern = columnNamePattern
Lukáš Lalinský's avatar
Lukáš Lalinský committed
274 275
        return self._apply(request)

276 277 278
    def getTableTypes(self, connectionId):
        request = requests_pb2.TableTypesRequest()
        request.connection_id = connectionId
Lukáš Lalinský's avatar
Lukáš Lalinský committed
279 280
        return self._apply(request)

281 282 283
    def getTypeInfo(self, connectionId):
        request = requests_pb2.TypeInfoRequest()
        request.connection_id = connectionId
Lukáš Lalinský's avatar
Lukáš Lalinský committed
284 285 286 287 288 289 290 291 292 293 294 295
        return self._apply(request)

    def connectionSync(self, connectionId, connProps=None):
        """Synchronizes connection properties with the server.

        :param connectionId:
            ID of the current connection.

        :param connProps:
            Dictionary with the properties that should be changed.

        :returns:
296
            A ``common_pb2.ConnectionProperties`` object.
Lukáš Lalinský's avatar
Lukáš Lalinský committed
297
        """
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
        if connProps is None:
            connProps = {}

        request = requests_pb2.ConnectionSyncRequest()
        request.connection_id = connectionId
        request.conn_props.auto_commit = connProps.get('autoCommit', False)
        request.conn_props.has_auto_commit = True
        request.conn_props.read_only = connProps.get('readOnly', False)
        request.conn_props.has_read_only = True
        request.conn_props.transaction_isolation = connProps.get('transactionIsolation', 0)
        request.conn_props.catalog = connProps.get('catalog', '')
        request.conn_props.schema = connProps.get('schema', '')

        response_data = self._apply(request)
        response = responses_pb2.ConnectionSyncResponse()
        response.ParseFromString(response_data)
Mark Heppner's avatar
Mark Heppner committed
314
        return response.conn_props
Lukáš Lalinský's avatar
Lukáš Lalinský committed
315

316 317 318 319
    def openConnection(self, connectionId, info=None):
        """Opens a new connection.

        :param connectionId:
320
            ID of the connection to open.
321
        """
322 323
        request = requests_pb2.OpenConnectionRequest()
        request.connection_id = connectionId
324
        if info is not None:
325
            # Info is a list of repeated pairs, setting a dict directly fails
Josh Elser's avatar
Josh Elser committed
326 327
            for k, v in info.items():
                request.info[k] = v
328 329 330 331

        response_data = self._apply(request)
        response = responses_pb2.OpenConnectionResponse()
        response.ParseFromString(response_data)
332

Lukáš Lalinský's avatar
Lukáš Lalinský committed
333 334 335 336 337 338
    def closeConnection(self, connectionId):
        """Closes a connection.

        :param connectionId:
            ID of the connection to close.
        """
339 340
        request = requests_pb2.CloseConnectionRequest()
        request.connection_id = connectionId
Lukáš Lalinský's avatar
Lukáš Lalinský committed
341 342 343 344 345 346 347 348 349 350 351
        self._apply(request)

    def createStatement(self, connectionId):
        """Creates a new statement.

        :param connectionId:
            ID of the current connection.

        :returns:
            New statement ID.
        """
352 353 354 355 356 357 358
        request = requests_pb2.CreateStatementRequest()
        request.connection_id = connectionId

        response_data = self._apply(request)
        response = responses_pb2.CreateStatementResponse()
        response.ParseFromString(response_data)
        return response.statement_id
Lukáš Lalinský's avatar
Lukáš Lalinský committed
359 360 361 362 363 364 365 366 367 368

    def closeStatement(self, connectionId, statementId):
        """Closes a statement.

        :param connectionId:
            ID of the current connection.

        :param statementId:
            ID of the statement to close.
        """
369 370 371 372
        request = requests_pb2.CloseStatementRequest()
        request.connection_id = connectionId
        request.statement_id = statementId

Lukáš Lalinský's avatar
Lukáš Lalinský committed
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
        self._apply(request)

    def prepareAndExecute(self, connectionId, statementId, sql, maxRowCount=-1):
        """Prepares and immediately executes a statement.

        :param connectionId:
            ID of the current connection.

        :param statementId:
            ID of the statement to prepare.

        :param sql:
            SQL query.

        :param maxRowCount:
            Maximum number of rows to return; negative means no limit.

        :returns:
            Result set with the signature of the prepared statement and the first frame data.
        """
393 394 395 396
        request = requests_pb2.PrepareAndExecuteRequest()
        request.connection_id = connectionId
        request.sql = sql
        request.max_row_count = maxRowCount
397
        request.max_rows_total = -1
398 399
        request.statement_id = statementId

400
        response_data = self._apply(request, 'ExecuteResponse')
401 402 403
        response = responses_pb2.ExecuteResponse()
        response.ParseFromString(response_data)
        return response.results
Lukáš Lalinský's avatar
Lukáš Lalinský committed
404

405
    def prepare(self, connectionId, sql, maxRowCount=-1):
Lukáš Lalinský's avatar
Lukáš Lalinský committed
406 407 408 409 410 411 412 413 414 415 416 417 418 419
        """Prepares a statement.

        :param connectionId:
            ID of the current connection.

        :param sql:
            SQL query.

        :param maxRowCount:
            Maximum number of rows to return; negative means no limit.

        :returns:
            Signature of the prepared statement.
        """
420 421 422 423
        request = requests_pb2.PrepareRequest()
        request.connection_id = connectionId
        request.sql = sql
        request.max_row_count = maxRowCount
424
        request.max_rows_total = -1
425 426 427 428 429 430 431

        response_data = self._apply(request)
        response = responses_pb2.PrepareResponse()
        response.ParseFromString(response_data)
        return response.statement

    def execute(self, connectionId, statementId, signature, parameterValues=None, maxRowCount=-1):
432 433 434 435 436 437 438 439 440 441 442 443
        """Returns a frame of rows.

        The frame describes whether there may be another frame. If there is not
        another frame, the current iteration is done when we have finished the
        rows in the this frame.

        :param connectionId:
            ID of the current connection.

        :param statementId:
            ID of the statement to fetch rows from.

444
        :param signature:
445
            common_pb2.Signature object
446

447 448 449 450 451 452 453 454 455
        :param parameterValues:
            A list of parameter values, if statement is to be executed; otherwise ``None``.

        :param maxRowCount:
            Maximum number of rows to return; negative means no limit.

        :returns:
            Frame data, or ``None`` if there are no more.
        """
456 457 458
        request = requests_pb2.ExecuteRequest()
        request.statementHandle.id = statementId
        request.statementHandle.connection_id = connectionId
Mark Heppner's avatar
Mark Heppner committed
459 460
        if parameterValues is not None:
            request.parameter_values.extend(parameterValues)
461 462
        request.has_parameter_values = parameterValues is not None
        request.statementHandle.signature.CopyFrom(signature)
463
        # TODO ExecuteRequest has no max_row_count
464 465 466 467 468

        response_data = self._apply(request)
        response = responses_pb2.ExecuteResponse()
        response.ParseFromString(response_data)
        return response.results
469

470
    def fetch(self, connectionId, statementId, parameterValues=None, offset=0, fetchMaxRowCount=-1):
Lukáš Lalinský's avatar
Lukáš Lalinský committed
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
        """Returns a frame of rows.

        The frame describes whether there may be another frame. If there is not
        another frame, the current iteration is done when we have finished the
        rows in the this frame.

        :param connectionId:
            ID of the current connection.

        :param statementId:
            ID of the statement to fetch rows from.

        :param parameterValues:
            A list of parameter values, if statement is to be executed; otherwise ``None``.

        :param offset:
            Zero-based offset of first row in the requested frame.

        :param fetchMaxRowCount:
            Maximum number of rows to return; negative means no limit.

        :returns:
            Frame data, or ``None`` if there are no more.
        """
495
        request = requests_pb2.FetchRequest()
Mark Heppner's avatar
Mark Heppner committed
496
        request.connection_id = connectionId
497 498 499 500 501 502 503 504
        request.statement_id = statementId
        request.offset = offset
        request.fetch_max_row_count = fetchMaxRowCount

        response_data = self._apply(request)
        response = responses_pb2.FetchResponse()
        response.ParseFromString(response_data)
        return response.frame