import json as json_
import re
import warnings
from enum import Enum
from types import TracebackType
from typing import Any, AsyncGenerator, BinaryIO, Dict, List, Optional, Type
from aiochclient.binary import (
fetch_column_header,
fetch_column_types,
rows_from_binary,
rows_to_binary,
)
from aiochclient.native import (
blocks_from_native,
rows_from_native,
rows_to_native,
rows_to_native_stream,
)
from aiochclient.exceptions import ChClientError
from aiochclient.http_clients.abc import HttpClientABC
from aiochclient.records import FromJsonFabric, Record, RecordsFabric
from aiochclient.sql import sqlparse
# Optional cython extension:
try:
from aiochclient._types import json2ch, py2ch, rows2ch
except ImportError:
from aiochclient.types import json2ch, py2ch, rows2ch
class QueryTypes(Enum):
FETCH = 0
INSERT = 1
OTHER = 2
[docs]
class ChClient:
"""ChClient connection class.
Usage:
.. code-block:: python
async with aiohttp.ClientSession() as s:
client = ChClient(s, compress_response=True)
nums = await client.fetch("SELECT number FROM system.numbers LIMIT 100")
:param aiohttp.ClientSession session:
aiohttp client session. Please, use one session
and one ChClient for all connections in your app.
:param str url:
Clickhouse server url. Need full path, like "http://localhost:8123/".
:param str user:
User name for authorization.
:param str password:
Password for authorization.
:param str database:
Database name.
:param bool compress_response:
Pass True if you want Clickhouse to compress its responses with gzip.
They will be decompressed automatically. But overall it will be slightly slower.
:param bool binary:
Use the binary ``RowBinary`` engine for results and inserts instead of the
default text (TSV) format. Faster, and the best choice for streaming
row-by-row via ``iterate``.
:param bool native:
Use the columnar ``Native`` engine — the fastest of the engines for bulk
SELECT and INSERT, decoding whole columns at once (no numpy). Takes
precedence over ``binary`` if both are set. Caveats: ``decode=False``
(raw bytes) is not supported, and the Native format carries no per-column
timezone, so a tz-aware ``DateTime``/``DateTime64`` is returned as a naive
UTC ``datetime``.
:param int insert_block_size:
Rows per streamed block for a ``native=True`` INSERT (default 8192). The
body is streamed as Native blocks so the server inserts one while the
client encodes the next; a multi-block insert is therefore not atomic on
a client-side encoding error (blocks already sent are committed). Set to 0
to send a single atomic block (no streaming overlap).
:param \\*\\*settings:
Any settings from https://clickhouse.com/docs/en/operations/settings/settings
"""
__slots__ = (
"_session",
"url",
"params",
"headers",
"_json",
"_http_client",
"_binary",
"_native",
"_insert_block_size",
)
def __init__(
self,
session=None,
url: str = "http://localhost:8123/",
user: str = None,
password: str = None,
database: str = "default",
compress_response: bool = False,
json=json_, # type: ignore
binary: bool = False,
native: bool = False,
insert_block_size: int = 8192,
**settings,
):
_http_client = HttpClientABC.choose_http_client(session)
self._http_client = _http_client(session)
self.url = url
self.params = {}
self.headers = {}
if user:
self.headers["X-ClickHouse-User"] = user
if password:
self.headers["X-ClickHouse-Key"] = password
if database:
self.params["database"] = database
if compress_response:
self.params["enable_http_compression"] = 1
self._json = json
# Decode SELECT results with the RowBinary / Native engine instead of TSV.
self._binary = binary
self._native = native
# Rows per streamed Native-INSERT block. Streaming overlaps the
# server-side insert with the next block's encoding, but a multi-block
# body is not atomic on a client-side encoding error. Set to 0 to send
# one atomic block instead (no overlap). Inserts of fewer rows than this
# are a single block regardless, so they stay atomic.
self._insert_block_size = insert_block_size
self.params.update(settings)
async def __aenter__(self) -> 'ChClient':
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
await self.close()
[docs]
async def close(self) -> None:
"""Close the session"""
await self._http_client.close()
[docs]
async def is_alive(self) -> bool:
"""Checks if connection is Ok.
Usage:
.. code-block:: python
assert await client.is_alive()
:return: True if connection Ok. False instead.
"""
try:
await self._http_client.get(
url=self.url,
params={**self.params, "query": "SELECT 1"},
headers=self.headers,
)
except ChClientError:
return False
return True
@staticmethod
def _prepare_query_params(params: Optional[Dict[str, Any]] = None):
if params is None:
return {}
if not isinstance(params, dict):
raise TypeError('Query params must be a Dict[str, Any]')
prepared_query_params = {}
for key, value in params.items():
prepared_query_params[key] = py2ch(value).decode('utf-8')
return prepared_query_params
async def _execute(
self,
query: str,
*args,
json: bool = False,
query_params: Optional[Dict[str, Any]] = None,
query_id: str = None,
decode: bool = True,
) -> AsyncGenerator[Record, None]:
query_params = self._prepare_query_params(query_params)
if query_params:
query = query.format(**query_params)
need_fetch, is_json, statement_type = self._parse_squery(query)
if not is_json and json:
query += " FORMAT JSONEachRow"
is_json = True
if not is_json and need_fetch:
if self._native:
query += " FORMAT Native"
elif self._binary:
query += " FORMAT RowBinaryWithNamesAndTypes"
else:
query += " FORMAT TSVWithNamesAndTypes"
if args:
if statement_type != 'INSERT':
raise ChClientError(
"It is possible to pass arguments only for INSERT queries"
)
if self._native and not is_json:
# Encode the rows column-by-column into a single Native block,
# using the target column names and types.
names, types = await self._fetch_insert_column_header(query)
query = self._columnar_insert_query(query, "Native")
if self._insert_block_size:
# Stream the body as multiple Native blocks so the server
# inserts one while the client encodes the next.
data = rows_to_native_stream(
args, names, types, self._insert_block_size
)
else:
# One atomic block: encode fully, then send.
data = rows_to_native(args, names, types)
elif self._binary and not is_json:
# RowBinary is type-specific, so fetch the target column types
# and encode the rows to match them exactly.
types = await self._fetch_insert_column_types(query)
query = self._columnar_insert_query(query, "RowBinary")
data = rows_to_binary(args, types)
elif is_json:
data = json2ch(*args, dumps=self._json.dumps)
else:
data = rows2ch(*args)
params = {**self.params, "query": query}
else:
params = {**self.params}
data = query.encode()
if query_id is not None:
params["query_id"] = query_id
if need_fetch:
if (self._native or self._binary) and not is_json:
source = self._http_client.post_return_bytes(
url=self.url, params=params, headers=self.headers, data=data
)
driver = rows_from_native if self._native else rows_from_binary
async for record in driver(source):
yield record
return
response = self._http_client.post_return_lines(
url=self.url, params=params, headers=self.headers, data=data
)
if is_json:
rf = FromJsonFabric(loads=self._json.loads)
async for line in response:
yield rf.new(line)
else:
rf = RecordsFabric(
names=await response.__anext__(),
tps=await response.__anext__(),
convert=decode,
)
async for line in response:
yield rf.new(line)
else:
await self._http_client.post_no_return(
url=self.url, params=params, headers=self.headers, data=data
)
def _native_block_stream(
self,
query: str,
params: Optional[Dict[str, Any]],
query_id: str,
):
"""Native-block source for a fetch-style query, or ``None`` if N/A.
``fetch`` collects a whole result into a list, so it can consume the
Native stream a block at a time and ``list.extend`` each block — keeping
the per-row work out of the (much costlier) async-generator protocol.
Returns ``None`` for queries that don't fetch in Native (so the caller
falls back to the generic row path).
"""
query_params = self._prepare_query_params(params)
if query_params:
query = query.format(**query_params)
need_fetch, is_json, _ = self._parse_squery(query)
if not need_fetch or is_json:
return None
query += " FORMAT Native"
request_params = {**self.params}
if query_id is not None:
request_params["query_id"] = query_id
source = self._http_client.post_return_bytes(
url=self.url,
params=request_params,
headers=self.headers,
data=query.encode(),
)
return blocks_from_native(source)
@staticmethod
def _columnar_insert_query(query: str, fmt: str) -> str:
# Swap the trailing ``VALUES`` for ``FORMAT <fmt>`` so the binary body is
# parsed as RowBinary/Native rather than as VALUES tuples.
head = re.sub(r"\s+VALUES\b.*$", "", query, flags=re.IGNORECASE | re.DOTALL)
return head.rstrip() + f" FORMAT {fmt}"
def _insert_column_probe(self, query: str):
# Look up the target columns (in the order the INSERT lists them) via a
# LIMIT 0 probe; the header carries their names and exact types.
match = re.match(
r"\s*INSERT\s+INTO\s+(?P<table>[^\s(]+)\s*(?:\((?P<cols>[^)]*)\))?",
query,
re.IGNORECASE,
)
if not match:
raise ChClientError("Could not parse the INSERT target for binary mode")
cols = match.group("cols")
probe = (
f"SELECT {cols.strip() if cols else '*'} "
f"FROM {match.group('table')} LIMIT 0 FORMAT RowBinaryWithNamesAndTypes"
)
return self._http_client.post_return_bytes(
url=self.url,
params={**self.params},
headers=self.headers,
data=probe.encode(),
)
async def _fetch_insert_column_types(self, query: str) -> list:
return await fetch_column_types(self._insert_column_probe(query))
async def _fetch_insert_column_header(self, query: str):
return await fetch_column_header(self._insert_column_probe(query))
[docs]
async def execute(
self,
query: str,
*args,
json: bool = False,
params: Optional[Dict[str, Any]] = None,
query_id: str = None,
) -> None:
"""Execute query. Returns None.
:param str query: Clickhouse query string.
:param args: Arguments for insert queries.
:param bool json: Execute query in JSONEachRow mode.
:param Optional[Dict[str, Any]] params: Params to escape
inside query string on field values.
:param str query_id: Clickhouse query_id.
Usage:
.. code-block:: python
await client.execute(
"CREATE TABLE t (a UInt8,
b Tuple(Date, Nullable(Float32))
) ENGINE = Memory"
)
await client.execute(
"INSERT INTO t VALUES",
(1, (dt.date(2018, 9, 7), None)),
(2, (dt.date(2018, 9, 8), 3.14)),
)
await client.execute(
"SELECT * FROM t WHERE a={u8}",
params={"u8": 12}
)
:return: Nothing.
"""
async for _ in self._execute(
query, *args, json=json, query_params=params, query_id=query_id
):
return None
[docs]
async def fetch(
self,
query: str,
*args,
json: bool = False,
params: Optional[Dict[str, Any]] = None,
query_id: str = None,
decode: bool = True,
) -> List[Record]:
"""Execute query and fetch all rows from query result at once in a list.
:param query: Clickhouse query string.
:param bool json: Execute query in JSONEachRow mode.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
:param str query_id: Clickhouse query_id.
:param decode: Decode to python types.
If False, returns bytes for each field instead.
Usage:
.. code-block:: python
all_rows = await client.fetch("SELECT * FROM t")
:return: All rows from query.
"""
if self._native and not json and not args:
blocks = self._native_block_stream(query, params, query_id)
if blocks is not None:
result: List[Record] = []
async for block in blocks:
result.extend(block)
return result
return [
row
async for row in self._execute(
query,
*args,
json=json,
query_params=params,
query_id=query_id,
decode=decode,
)
]
[docs]
async def fetchrow(
self,
query: str,
*args,
json: bool = False,
params: Optional[Dict[str, Any]] = None,
query_id: str = None,
decode: bool = True,
) -> Optional[Record]:
"""Execute query and fetch first row from query result or None.
:param query: Clickhouse query string.
:param bool json: Execute query in JSONEachRow mode.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
:param str query_id: Clickhouse query_id.
:param decode: Decode to python types. If False,
returns bytes for each field instead.
Usage:
.. code-block:: python
row = await client.fetchrow("SELECT * FROM t WHERE a=1")
assert row[0] == 1
assert row["b"] == (dt.date(2018, 9, 7), None)
:return: First row from query or None if there no results.
"""
async for row in self._execute(
query,
*args,
json=json,
query_params=params,
query_id=query_id,
decode=decode,
):
return row
return None
[docs]
async def fetchone(self, query: str, *args) -> Optional[Record]:
"""Deprecated. Use ``fetchrow`` method instead"""
warnings.warn(
"'fetchone' method is deprecated. Use 'fetchrow' method instead",
PendingDeprecationWarning,
)
return await self.fetchrow(query, *args)
[docs]
async def fetchval(
self,
query: str,
*args,
json: bool = False,
params: Optional[Dict[str, Any]] = None,
query_id: str = None,
decode: bool = True,
) -> Any:
"""Execute query and fetch first value of the first
row from query result or None.
:param query: Clickhouse query string.
:param bool json: Execute query in JSONEachRow mode.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
:param str query_id: Clickhouse query_id.
:param decode: Decode to python types.
If False, returns bytes for each field instead.
Usage:
.. code-block:: python
val = await client.fetchval("SELECT b FROM t WHERE a=2")
assert val == (dt.date(2018, 9, 8), 3.14)
:return: First value of the first row or None if there no results.
"""
async for row in self._execute(
query,
*args,
json=json,
query_params=params,
query_id=query_id,
decode=decode,
):
if row:
return row[0]
return None
[docs]
async def iterate(
self,
query: str,
*args,
json: bool = False,
params: Optional[Dict[str, Any]] = None,
query_id: str = None,
decode: bool = True,
) -> AsyncGenerator[Record, None]:
"""Async generator by all rows from query result.
:param str query: Clickhouse query string.
:param bool json: Execute query in JSONEachRow mode.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
:param str query_id: Clickhouse query_id.
:param decode: Decode to python types.
If False, returns bytes for each field instead.
Usage:
.. code-block:: python
async for row in client.iterate(
"SELECT number, number*2 FROM system.numbers LIMIT 10000"
):
assert row[0] * 2 == row[1]
async for row in client.iterate(
"SELECT number, number*2 FROM system.numbers LIMIT {numbers_limit}",
params={"numbers_limit": 10000}
):
assert row[0] * 2 == row[1]
:return: Rows one by one.
"""
async for row in self._execute(
query,
*args,
json=json,
query_params=params,
query_id=query_id,
decode=decode,
):
yield row
[docs]
async def cursor(self, query: str, *args) -> AsyncGenerator[Record, None]:
"""Deprecated. Use ``iterate`` method instead"""
warnings.warn(
"'cursor' method is deprecated. Use 'iterate' method instead",
PendingDeprecationWarning,
)
async for row in self.iterate(query, *args):
yield row
[docs]
async def insert_file(
self,
query: str,
file_obj: BinaryIO,
params: Optional[Dict[str, Any]] = None,
) -> None:
"""Insert file in any suppoted by ClickHouse format. Returns None.
:param str query: Clickhouse query string which include format part.
:param bool file_obj: File object to insert.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
Usage:
.. code-block:: python
with open('data.csv', 'rb') as f:
await client.insert_file(
"INSERT INTO t FORMAT CSV",
f.read(),
)
with open('data.json', 'rb') as f:
await client.insert_file(
"INSERT INTO t FORMAT JSONEachRow",
f.read(),
)
response = requests.get("https://url_to_download_parquet_file")
await client.insert_file(
"INSERT INTO t FORMAT Parquet",
response.content,
)
:return: Nothing.
"""
self._check_insert_file_query(query)
query_params = self._prepare_query_params(params)
if query_params:
query = query.format(**query_params)
params = {**self.params, "query": query}
await self._http_client.post_no_return(
url=self.url,
params=params,
headers=self.headers,
data=file_obj,
)
@staticmethod
def _parse_squery(query):
statement = sqlparse.parse(query)[0]
statement_type = statement.get_type()
if statement_type in ('SELECT', 'SHOW', 'DESCRIBE', 'EXISTS', 'EXPLAIN'):
need_fetch = True
else:
need_fetch = False
fmt = statement.token_matching(
(lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0
)
if fmt:
is_json = statement.token_matching(
(lambda tk: tk.match(None, ['JSONEachRow']),),
statement.token_index(fmt) + 1,
)
else:
is_json = False
return need_fetch, is_json, statement_type
@staticmethod
def _check_insert_file_query(query: str) -> None:
statement = sqlparse.parse(query)[0]
if statement.get_type() != 'INSERT':
raise ChClientError('It is possible to insert file only with INSERT query')
if not statement.token_matching(
(lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0
):
raise ChClientError(
'To insert file its required to specify `FORMAT [...] in the query.'
)