aiochclient.ChClient

class aiochclient.ChClient(session=None, url: str = 'http://localhost:8123/', user: str = None, password: str = None, database: str = 'default', compress_response: bool = False, json=<module 'json' from '/home/docs/.asdf/installs/python/3.11.14/lib/python3.11/json/__init__.py'>, binary: bool = False, native: bool = False, insert_block_size: int = 8192, **settings)[source]

ChClient connection class.

Usage:

async with aiohttp.ClientSession() as s:
    client = ChClient(s, compress_response=True)
    nums = await client.fetch("SELECT number FROM system.numbers LIMIT 100")
Parameters:
  • session (aiohttp.ClientSession) – aiohttp client session. Please, use one session and one ChClient for all connections in your app.

  • url (str) – Clickhouse server url. Need full path, like “http://localhost:8123/”.

  • user (str) – User name for authorization.

  • password (str) – Password for authorization.

  • database (str) – Database name.

  • compress_response (bool) – Pass True if you want Clickhouse to compress its responses with gzip. They will be decompressed automatically. But overall it will be slightly slower.

  • binary (bool) – Use the binary RowBinary engine for results and inserts instead of the default text (TSV) format. Faster, and the best choice for streaming row-by-row via iterate.

  • native (bool) – Use the columnar Native engine — the fastest of the engines for bulk SELECT and INSERT, decoding whole columns at once (no numpy). Takes precedence over binary if both are set. Caveats: decode=False (raw bytes) is not supported, and the Native format carries no per-column timezone, so a tz-aware DateTime/DateTime64 is returned as a naive UTC datetime.

  • insert_block_size (int) – Rows per streamed block for a native=True INSERT (default 8192). The body is streamed as Native blocks so the server inserts one while the client encodes the next; a multi-block insert is therefore not atomic on a client-side encoding error (blocks already sent are committed). Set to 0 to send a single atomic block (no streaming overlap).

  • **settings – Any settings from https://clickhouse.com/docs/en/operations/settings/settings

async close() None[source]

Close the session

async cursor(query: str, *args) AsyncGenerator[Record, None][source]

Deprecated. Use iterate method instead

async execute(query: str, *args, json: bool = False, params: Dict[str, Any] | None = None, query_id: str = None) None[source]

Execute query. Returns None.

Parameters:
  • query (str) – Clickhouse query string.

  • args – Arguments for insert queries.

  • json (bool) – Execute query in JSONEachRow mode.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string on field values.

  • query_id (str) – Clickhouse query_id.

Usage:

await client.execute(
    "CREATE TABLE t (a UInt8,
                     b Tuple(Date, Nullable(Float32))
                     ) ENGINE = Memory"
)
await client.execute(
    "INSERT INTO t VALUES",
    (1, (dt.date(2018, 9, 7), None)),
    (2, (dt.date(2018, 9, 8), 3.14)),
)
await client.execute(
    "SELECT * FROM t WHERE a={u8}",
    params={"u8": 12}
)
Returns:

Nothing.

async fetch(query: str, *args, json: bool = False, params: Dict[str, Any] | None = None, query_id: str = None, decode: bool = True) List[Record][source]

Execute query and fetch all rows from query result at once in a list.

Parameters:
  • query – Clickhouse query string.

  • json (bool) – Execute query in JSONEachRow mode.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string.

  • query_id (str) – Clickhouse query_id.

  • decode – Decode to python types. If False, returns bytes for each field instead.

Usage:

all_rows = await client.fetch("SELECT * FROM t")
Returns:

All rows from query.

async fetchone(query: str, *args) Record | None[source]

Deprecated. Use fetchrow method instead

async fetchrow(query: str, *args, json: bool = False, params: Dict[str, Any] | None = None, query_id: str = None, decode: bool = True) Record | None[source]

Execute query and fetch first row from query result or None.

Parameters:
  • query – Clickhouse query string.

  • json (bool) – Execute query in JSONEachRow mode.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string.

  • query_id (str) – Clickhouse query_id.

  • decode – Decode to python types. If False, returns bytes for each field instead.

Usage:

row = await client.fetchrow("SELECT * FROM t WHERE a=1")
assert row[0] == 1
assert row["b"] == (dt.date(2018, 9, 7), None)
Returns:

First row from query or None if there no results.

async fetchval(query: str, *args, json: bool = False, params: Dict[str, Any] | None = None, query_id: str = None, decode: bool = True) Any[source]
Execute query and fetch first value of the first

row from query result or None.

Parameters:
  • query – Clickhouse query string.

  • json (bool) – Execute query in JSONEachRow mode.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string.

  • query_id (str) – Clickhouse query_id.

  • decode – Decode to python types. If False, returns bytes for each field instead.

Usage:

val = await client.fetchval("SELECT b FROM t WHERE a=2")
assert val == (dt.date(2018, 9, 8), 3.14)
Returns:

First value of the first row or None if there no results.

async insert_file(query: str, file_obj: BinaryIO, params: Dict[str, Any] | None = None) None[source]

Insert file in any suppoted by ClickHouse format. Returns None.

Parameters:
  • query (str) – Clickhouse query string which include format part.

  • file_obj (bool) – File object to insert.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string.

Usage:

with open('data.csv', 'rb') as f:
    await client.insert_file(
        "INSERT INTO t FORMAT CSV",
        f.read(),
    )

with open('data.json', 'rb') as f:
    await client.insert_file(
        "INSERT INTO t FORMAT JSONEachRow",
        f.read(),
    )

response = requests.get("https://url_to_download_parquet_file")
await client.insert_file(
    "INSERT INTO t FORMAT Parquet",
    response.content,
)
Returns:

Nothing.

async is_alive() bool[source]

Checks if connection is Ok.

Usage:

assert await client.is_alive()
Returns:

True if connection Ok. False instead.

async iterate(query: str, *args, json: bool = False, params: Dict[str, Any] | None = None, query_id: str = None, decode: bool = True) AsyncGenerator[Record, None][source]

Async generator by all rows from query result.

Parameters:
  • query (str) – Clickhouse query string.

  • json (bool) – Execute query in JSONEachRow mode.

  • params (Optional[Dict[str, Any]]) – Params to escape inside query string.

  • query_id (str) – Clickhouse query_id.

  • decode – Decode to python types. If False, returns bytes for each field instead.

Usage:

async for row in client.iterate(
    "SELECT number, number*2 FROM system.numbers LIMIT 10000"
):
    assert row[0] * 2 == row[1]

async for row in client.iterate(
    "SELECT number, number*2 FROM system.numbers LIMIT {numbers_limit}",
    params={"numbers_limit": 10000}
):
    assert row[0] * 2 == row[1]
Returns:

Rows one by one.