Source code for aiochclient.client

import json as json_
import warnings
from enum import Enum
from types import TracebackType
from typing import Any, AsyncGenerator, BinaryIO, Dict, List, Optional, Type

from aiochclient.exceptions import ChClientError
from aiochclient.http_clients.abc import HttpClientABC
from aiochclient.records import FromJsonFabric, Record, RecordsFabric
from aiochclient.sql import sqlparse

# Optional cython extension:
try:
    from aiochclient._types import json2ch, py2ch, rows2ch
except ImportError:
    from aiochclient.types import json2ch, py2ch, rows2ch


class QueryTypes(Enum):
    FETCH = 0
    INSERT = 1
    OTHER = 2


[docs] class ChClient: """ChClient connection class. Usage: .. code-block:: python async with aiohttp.ClientSession() as s: client = ChClient(s, compress_response=True) nums = await client.fetch("SELECT number FROM system.numbers LIMIT 100") :param aiohttp.ClientSession session: aiohttp client session. Please, use one session and one ChClient for all connections in your app. :param str url: Clickhouse server url. Need full path, like "http://localhost:8123/". :param str user: User name for authorization. :param str password: Password for authorization. :param str database: Database name. :param bool compress_response: Pass True if you want Clickhouse to compress its responses with gzip. They will be decompressed automatically. But overall it will be slightly slower. :param **settings: Any settings from https://clickhouse.yandex/docs/en/operations/settings """ __slots__ = ("_session", "url", "params", "headers", "_json", "_http_client") def __init__( self, session=None, url: str = "http://localhost:8123/", user: str = None, password: str = None, database: str = "default", compress_response: bool = False, json=json_, # type: ignore **settings, ): _http_client = HttpClientABC.choose_http_client(session) self._http_client = _http_client(session) self.url = url self.params = {} self.headers = {} if user: self.headers["X-ClickHouse-User"] = user if password: self.headers["X-ClickHouse-Key"] = password if database: self.params["database"] = database if compress_response: self.params["enable_http_compression"] = 1 self._json = json self.params.update(settings) async def __aenter__(self) -> 'ChClient': return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: await self.close()
[docs] async def close(self) -> None: """Close the session""" await self._http_client.close()
[docs] async def is_alive(self) -> bool: """Checks if connection is Ok. Usage: .. code-block:: python assert await client.is_alive() :return: True if connection Ok. False instead. """ try: await self._http_client.get( url=self.url, params={**self.params, "query": "SELECT 1"}, headers=self.headers, ) except ChClientError: return False return True
@staticmethod def _prepare_query_params(params: Optional[Dict[str, Any]] = None): if params is None: return {} if not isinstance(params, dict): raise TypeError('Query params must be a Dict[str, Any]') prepared_query_params = {} for key, value in params.items(): prepared_query_params[key] = py2ch(value).decode('utf-8') return prepared_query_params async def _execute( self, query: str, *args, json: bool = False, query_params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> AsyncGenerator[Record, None]: query_params = self._prepare_query_params(query_params) if query_params: query = query.format(**query_params) need_fetch, is_json, statement_type = self._parse_squery(query) if not is_json and json: query += " FORMAT JSONEachRow" is_json = True if not is_json and need_fetch: query += " FORMAT TSVWithNamesAndTypes" if args: if statement_type != 'INSERT': raise ChClientError( "It is possible to pass arguments only for INSERT queries" ) params = {**self.params, "query": query} if is_json: data = json2ch(*args, dumps=self._json.dumps) else: data = rows2ch(*args) else: params = {**self.params} data = query.encode() if query_id is not None: params["query_id"] = query_id if need_fetch: response = self._http_client.post_return_lines( url=self.url, params=params, headers=self.headers, data=data ) if is_json: rf = FromJsonFabric(loads=self._json.loads) async for line in response: yield rf.new(line) else: rf = RecordsFabric( names=await response.__anext__(), tps=await response.__anext__(), convert=decode, ) async for line in response: yield rf.new(line) else: await self._http_client.post_no_return( url=self.url, params=params, headers=self.headers, data=data )
[docs] async def execute( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, ) -> None: """Execute query. Returns None. :param str query: Clickhouse query string. :param args: Arguments for insert queries. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string on field values. :param str query_id: Clickhouse query_id. Usage: .. code-block:: python await client.execute( "CREATE TABLE t (a UInt8, b Tuple(Date, Nullable(Float32)) ) ENGINE = Memory" ) await client.execute( "INSERT INTO t VALUES", (1, (dt.date(2018, 9, 7), None)), (2, (dt.date(2018, 9, 8), 3.14)), ) await client.execute( "SELECT * FROM t WHERE a={u8}", params={"u8": 12} ) :return: Nothing. """ async for _ in self._execute( query, *args, json=json, query_params=params, query_id=query_id ): return None
[docs] async def fetch( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> List[Record]: """Execute query and fetch all rows from query result at once in a list. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python all_rows = await client.fetch("SELECT * FROM t") :return: All rows from query. """ return [ row async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ) ]
[docs] async def fetchrow( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> Optional[Record]: """Execute query and fetch first row from query result or None. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python row = await client.fetchrow("SELECT * FROM t WHERE a=1") assert row[0] == 1 assert row["b"] == (dt.date(2018, 9, 7), None) :return: First row from query or None if there no results. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): return row return None
[docs] async def fetchone(self, query: str, *args) -> Optional[Record]: """Deprecated. Use ``fetchrow`` method instead""" warnings.warn( "'fetchone' method is deprecated. Use 'fetchrow' method instead", PendingDeprecationWarning, ) return await self.fetchrow(query, *args)
[docs] async def fetchval( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> Any: """Execute query and fetch first value of the first row from query result or None. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python val = await client.fetchval("SELECT b FROM t WHERE a=2") assert val == (dt.date(2018, 9, 8), 3.14) :return: First value of the first row or None if there no results. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): if row: return row[0] return None
[docs] async def iterate( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> AsyncGenerator[Record, None]: """Async generator by all rows from query result. :param str query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python async for row in client.iterate( "SELECT number, number*2 FROM system.numbers LIMIT 10000" ): assert row[0] * 2 == row[1] async for row in client.iterate( "SELECT number, number*2 FROM system.numbers LIMIT {numbers_limit}", params={"numbers_limit": 10000} ): assert row[0] * 2 == row[1] :return: Rows one by one. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): yield row
[docs] async def cursor(self, query: str, *args) -> AsyncGenerator[Record, None]: """Deprecated. Use ``iterate`` method instead""" warnings.warn( "'cursor' method is deprecated. Use 'iterate' method instead", PendingDeprecationWarning, ) async for row in self.iterate(query, *args): yield row
[docs] async def insert_file( self, query: str, file_obj: BinaryIO, params: Optional[Dict[str, Any]] = None, ) -> None: """Insert file in any suppoted by ClickHouse format. Returns None. :param str query: Clickhouse query string which include format part. :param bool file_obj: File object to insert. :param Optional[Dict[str, Any]] params: Params to escape inside query string. Usage: .. code-block:: python with open('data.csv', 'rb') as f: await client.insert_file( "INSERT INTO t FORMAT CSV", f.read(), ) with open('data.json', 'rb') as f: await client.insert_file( "INSERT INTO t FORMAT JSONEachRow", f.read(), ) response = requests.get("https://url_to_download_parquet_file") await client.insert_file( "INSERT INTO t FORMAT Parquet", response.content, ) :return: Nothing. """ self._check_insert_file_query(query) query_params = self._prepare_query_params(params) if query_params: query = query.format(**query_params) params = {**self.params, "query": query} await self._http_client.post_no_return( url=self.url, params=params, headers=self.headers, data=file_obj, )
@staticmethod def _parse_squery(query): statement = sqlparse.parse(query)[0] statement_type = statement.get_type() if statement_type in ('SELECT', 'SHOW', 'DESCRIBE', 'EXISTS'): need_fetch = True else: need_fetch = False fmt = statement.token_matching( (lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0 ) if fmt: is_json = statement.token_matching( (lambda tk: tk.match(None, ['JSONEachRow']),), statement.token_index(fmt) + 1, ) else: is_json = False return need_fetch, is_json, statement_type @staticmethod def _check_insert_file_query(query: str) -> None: statement = sqlparse.parse(query)[0] if statement.get_type() != 'INSERT': raise ChClientError('It is possible to insert file only with INSERT query') if not statement.token_matching( (lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0 ): raise ChClientError( 'To insert file its required to specify `FORMAT [...] in the query.' )