Source code for aiochclient.client

import json as json_
import re
import warnings
from enum import Enum
from types import TracebackType
from typing import Any, AsyncGenerator, BinaryIO, Dict, List, Optional, Type

from aiochclient.binary import (
    fetch_column_header,
    fetch_column_types,
    rows_from_binary,
    rows_to_binary,
)
from aiochclient.native import (
    blocks_from_native,
    rows_from_native,
    rows_to_native,
    rows_to_native_stream,
)
from aiochclient.exceptions import ChClientError
from aiochclient.http_clients.abc import HttpClientABC
from aiochclient.records import FromJsonFabric, Record, RecordsFabric
from aiochclient.sql import sqlparse

# Optional cython extension:
try:
    from aiochclient._types import json2ch, py2ch, rows2ch
except ImportError:
    from aiochclient.types import json2ch, py2ch, rows2ch


class QueryTypes(Enum):
    FETCH = 0
    INSERT = 1
    OTHER = 2


[docs] class ChClient: """ChClient connection class. Usage: .. code-block:: python async with aiohttp.ClientSession() as s: client = ChClient(s, compress_response=True) nums = await client.fetch("SELECT number FROM system.numbers LIMIT 100") :param aiohttp.ClientSession session: aiohttp client session. Please, use one session and one ChClient for all connections in your app. :param str url: Clickhouse server url. Need full path, like "http://localhost:8123/". :param str user: User name for authorization. :param str password: Password for authorization. :param str database: Database name. :param bool compress_response: Pass True if you want Clickhouse to compress its responses with gzip. They will be decompressed automatically. But overall it will be slightly slower. :param bool binary: Use the binary ``RowBinary`` engine for results and inserts instead of the default text (TSV) format. Faster, and the best choice for streaming row-by-row via ``iterate``. :param bool native: Use the columnar ``Native`` engine — the fastest of the engines for bulk SELECT and INSERT, decoding whole columns at once (no numpy). Takes precedence over ``binary`` if both are set. Caveats: ``decode=False`` (raw bytes) is not supported, and the Native format carries no per-column timezone, so a tz-aware ``DateTime``/``DateTime64`` is returned as a naive UTC ``datetime``. :param int insert_block_size: Rows per streamed block for a ``native=True`` INSERT (default 8192). The body is streamed as Native blocks so the server inserts one while the client encodes the next; a multi-block insert is therefore not atomic on a client-side encoding error (blocks already sent are committed). Set to 0 to send a single atomic block (no streaming overlap). :param \\*\\*settings: Any settings from https://clickhouse.com/docs/en/operations/settings/settings """ __slots__ = ( "_session", "url", "params", "headers", "_json", "_http_client", "_binary", "_native", "_insert_block_size", ) def __init__( self, session=None, url: str = "http://localhost:8123/", user: str = None, password: str = None, database: str = "default", compress_response: bool = False, json=json_, # type: ignore binary: bool = False, native: bool = False, insert_block_size: int = 8192, **settings, ): _http_client = HttpClientABC.choose_http_client(session) self._http_client = _http_client(session) self.url = url self.params = {} self.headers = {} if user: self.headers["X-ClickHouse-User"] = user if password: self.headers["X-ClickHouse-Key"] = password if database: self.params["database"] = database if compress_response: self.params["enable_http_compression"] = 1 self._json = json # Decode SELECT results with the RowBinary / Native engine instead of TSV. self._binary = binary self._native = native # Rows per streamed Native-INSERT block. Streaming overlaps the # server-side insert with the next block's encoding, but a multi-block # body is not atomic on a client-side encoding error. Set to 0 to send # one atomic block instead (no overlap). Inserts of fewer rows than this # are a single block regardless, so they stay atomic. self._insert_block_size = insert_block_size self.params.update(settings) async def __aenter__(self) -> 'ChClient': return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: await self.close()
[docs] async def close(self) -> None: """Close the session""" await self._http_client.close()
[docs] async def is_alive(self) -> bool: """Checks if connection is Ok. Usage: .. code-block:: python assert await client.is_alive() :return: True if connection Ok. False instead. """ try: await self._http_client.get( url=self.url, params={**self.params, "query": "SELECT 1"}, headers=self.headers, ) except ChClientError: return False return True
@staticmethod def _prepare_query_params(params: Optional[Dict[str, Any]] = None): if params is None: return {} if not isinstance(params, dict): raise TypeError('Query params must be a Dict[str, Any]') prepared_query_params = {} for key, value in params.items(): prepared_query_params[key] = py2ch(value).decode('utf-8') return prepared_query_params async def _execute( self, query: str, *args, json: bool = False, query_params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> AsyncGenerator[Record, None]: query_params = self._prepare_query_params(query_params) if query_params: query = query.format(**query_params) need_fetch, is_json, statement_type = self._parse_squery(query) if not is_json and json: query += " FORMAT JSONEachRow" is_json = True if not is_json and need_fetch: if self._native: query += " FORMAT Native" elif self._binary: query += " FORMAT RowBinaryWithNamesAndTypes" else: query += " FORMAT TSVWithNamesAndTypes" if args: if statement_type != 'INSERT': raise ChClientError( "It is possible to pass arguments only for INSERT queries" ) if self._native and not is_json: # Encode the rows column-by-column into a single Native block, # using the target column names and types. names, types = await self._fetch_insert_column_header(query) query = self._columnar_insert_query(query, "Native") if self._insert_block_size: # Stream the body as multiple Native blocks so the server # inserts one while the client encodes the next. data = rows_to_native_stream( args, names, types, self._insert_block_size ) else: # One atomic block: encode fully, then send. data = rows_to_native(args, names, types) elif self._binary and not is_json: # RowBinary is type-specific, so fetch the target column types # and encode the rows to match them exactly. types = await self._fetch_insert_column_types(query) query = self._columnar_insert_query(query, "RowBinary") data = rows_to_binary(args, types) elif is_json: data = json2ch(*args, dumps=self._json.dumps) else: data = rows2ch(*args) params = {**self.params, "query": query} else: params = {**self.params} data = query.encode() if query_id is not None: params["query_id"] = query_id if need_fetch: if (self._native or self._binary) and not is_json: source = self._http_client.post_return_bytes( url=self.url, params=params, headers=self.headers, data=data ) driver = rows_from_native if self._native else rows_from_binary async for record in driver(source): yield record return response = self._http_client.post_return_lines( url=self.url, params=params, headers=self.headers, data=data ) if is_json: rf = FromJsonFabric(loads=self._json.loads) async for line in response: yield rf.new(line) else: rf = RecordsFabric( names=await response.__anext__(), tps=await response.__anext__(), convert=decode, ) async for line in response: yield rf.new(line) else: await self._http_client.post_no_return( url=self.url, params=params, headers=self.headers, data=data ) def _native_block_stream( self, query: str, params: Optional[Dict[str, Any]], query_id: str, ): """Native-block source for a fetch-style query, or ``None`` if N/A. ``fetch`` collects a whole result into a list, so it can consume the Native stream a block at a time and ``list.extend`` each block — keeping the per-row work out of the (much costlier) async-generator protocol. Returns ``None`` for queries that don't fetch in Native (so the caller falls back to the generic row path). """ query_params = self._prepare_query_params(params) if query_params: query = query.format(**query_params) need_fetch, is_json, _ = self._parse_squery(query) if not need_fetch or is_json: return None query += " FORMAT Native" request_params = {**self.params} if query_id is not None: request_params["query_id"] = query_id source = self._http_client.post_return_bytes( url=self.url, params=request_params, headers=self.headers, data=query.encode(), ) return blocks_from_native(source) @staticmethod def _columnar_insert_query(query: str, fmt: str) -> str: # Swap the trailing ``VALUES`` for ``FORMAT <fmt>`` so the binary body is # parsed as RowBinary/Native rather than as VALUES tuples. head = re.sub(r"\s+VALUES\b.*$", "", query, flags=re.IGNORECASE | re.DOTALL) return head.rstrip() + f" FORMAT {fmt}" def _insert_column_probe(self, query: str): # Look up the target columns (in the order the INSERT lists them) via a # LIMIT 0 probe; the header carries their names and exact types. match = re.match( r"\s*INSERT\s+INTO\s+(?P<table>[^\s(]+)\s*(?:\((?P<cols>[^)]*)\))?", query, re.IGNORECASE, ) if not match: raise ChClientError("Could not parse the INSERT target for binary mode") cols = match.group("cols") probe = ( f"SELECT {cols.strip() if cols else '*'} " f"FROM {match.group('table')} LIMIT 0 FORMAT RowBinaryWithNamesAndTypes" ) return self._http_client.post_return_bytes( url=self.url, params={**self.params}, headers=self.headers, data=probe.encode(), ) async def _fetch_insert_column_types(self, query: str) -> list: return await fetch_column_types(self._insert_column_probe(query)) async def _fetch_insert_column_header(self, query: str): return await fetch_column_header(self._insert_column_probe(query))
[docs] async def execute( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, ) -> None: """Execute query. Returns None. :param str query: Clickhouse query string. :param args: Arguments for insert queries. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string on field values. :param str query_id: Clickhouse query_id. Usage: .. code-block:: python await client.execute( "CREATE TABLE t (a UInt8, b Tuple(Date, Nullable(Float32)) ) ENGINE = Memory" ) await client.execute( "INSERT INTO t VALUES", (1, (dt.date(2018, 9, 7), None)), (2, (dt.date(2018, 9, 8), 3.14)), ) await client.execute( "SELECT * FROM t WHERE a={u8}", params={"u8": 12} ) :return: Nothing. """ async for _ in self._execute( query, *args, json=json, query_params=params, query_id=query_id ): return None
[docs] async def fetch( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> List[Record]: """Execute query and fetch all rows from query result at once in a list. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python all_rows = await client.fetch("SELECT * FROM t") :return: All rows from query. """ if self._native and not json and not args: blocks = self._native_block_stream(query, params, query_id) if blocks is not None: result: List[Record] = [] async for block in blocks: result.extend(block) return result return [ row async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ) ]
[docs] async def fetchrow( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> Optional[Record]: """Execute query and fetch first row from query result or None. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python row = await client.fetchrow("SELECT * FROM t WHERE a=1") assert row[0] == 1 assert row["b"] == (dt.date(2018, 9, 7), None) :return: First row from query or None if there no results. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): return row return None
[docs] async def fetchone(self, query: str, *args) -> Optional[Record]: """Deprecated. Use ``fetchrow`` method instead""" warnings.warn( "'fetchone' method is deprecated. Use 'fetchrow' method instead", PendingDeprecationWarning, ) return await self.fetchrow(query, *args)
[docs] async def fetchval( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> Any: """Execute query and fetch first value of the first row from query result or None. :param query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python val = await client.fetchval("SELECT b FROM t WHERE a=2") assert val == (dt.date(2018, 9, 8), 3.14) :return: First value of the first row or None if there no results. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): if row: return row[0] return None
[docs] async def iterate( self, query: str, *args, json: bool = False, params: Optional[Dict[str, Any]] = None, query_id: str = None, decode: bool = True, ) -> AsyncGenerator[Record, None]: """Async generator by all rows from query result. :param str query: Clickhouse query string. :param bool json: Execute query in JSONEachRow mode. :param Optional[Dict[str, Any]] params: Params to escape inside query string. :param str query_id: Clickhouse query_id. :param decode: Decode to python types. If False, returns bytes for each field instead. Usage: .. code-block:: python async for row in client.iterate( "SELECT number, number*2 FROM system.numbers LIMIT 10000" ): assert row[0] * 2 == row[1] async for row in client.iterate( "SELECT number, number*2 FROM system.numbers LIMIT {numbers_limit}", params={"numbers_limit": 10000} ): assert row[0] * 2 == row[1] :return: Rows one by one. """ async for row in self._execute( query, *args, json=json, query_params=params, query_id=query_id, decode=decode, ): yield row
[docs] async def cursor(self, query: str, *args) -> AsyncGenerator[Record, None]: """Deprecated. Use ``iterate`` method instead""" warnings.warn( "'cursor' method is deprecated. Use 'iterate' method instead", PendingDeprecationWarning, ) async for row in self.iterate(query, *args): yield row
[docs] async def insert_file( self, query: str, file_obj: BinaryIO, params: Optional[Dict[str, Any]] = None, ) -> None: """Insert file in any suppoted by ClickHouse format. Returns None. :param str query: Clickhouse query string which include format part. :param bool file_obj: File object to insert. :param Optional[Dict[str, Any]] params: Params to escape inside query string. Usage: .. code-block:: python with open('data.csv', 'rb') as f: await client.insert_file( "INSERT INTO t FORMAT CSV", f.read(), ) with open('data.json', 'rb') as f: await client.insert_file( "INSERT INTO t FORMAT JSONEachRow", f.read(), ) response = requests.get("https://url_to_download_parquet_file") await client.insert_file( "INSERT INTO t FORMAT Parquet", response.content, ) :return: Nothing. """ self._check_insert_file_query(query) query_params = self._prepare_query_params(params) if query_params: query = query.format(**query_params) params = {**self.params, "query": query} await self._http_client.post_no_return( url=self.url, params=params, headers=self.headers, data=file_obj, )
@staticmethod def _parse_squery(query): statement = sqlparse.parse(query)[0] statement_type = statement.get_type() if statement_type in ('SELECT', 'SHOW', 'DESCRIBE', 'EXISTS', 'EXPLAIN'): need_fetch = True else: need_fetch = False fmt = statement.token_matching( (lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0 ) if fmt: is_json = statement.token_matching( (lambda tk: tk.match(None, ['JSONEachRow']),), statement.token_index(fmt) + 1, ) else: is_json = False return need_fetch, is_json, statement_type @staticmethod def _check_insert_file_query(query: str) -> None: statement = sqlparse.parse(query)[0] if statement.get_type() != 'INSERT': raise ChClientError('It is possible to insert file only with INSERT query') if not statement.token_matching( (lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0 ): raise ChClientError( 'To insert file its required to specify `FORMAT [...] in the query.' )