workflows-example-dags/Demo/tools/execute_dremio_query.py
2026-05-28 17:44:11 +02:00

150 lines
4.2 KiB
Python

import time
import pyarrow as pa
from pyarrow import flight
import logging
logger = logging.getLogger(__file__)
def initialize_flight(flight_uri: str, username, password):
"""Initialize flight connection for a configuration"""
client = flight.FlightClient(flight_uri)
headers = []
initial_options = flight.FlightCallOptions(headers=headers)
auth_header = client.authenticate_basic_token(username, password, initial_options)
return client, auth_header
def read_query_arrow(
client: flight.FlightClient,
auth_header: tuple,
query: str,
return_iterator: bool = False,
tqdm=None,
):
"""Load Dremio query to Arrow Table using Flight
Parameters
----------
client: flight.FlightClient
The flight client to be used.
auth_header: (str, str)
The bearer token to be used.
query : str
The dremio query to be executed.
return_iterator : bool, optional
If True, an iterator yielding Arrow Tables is returned instead of the
whole DataFrame.
return_only_info : bool, optional
If True, only the flight info is returned. This is useful if you want to
get the schema of a query without actually loading the data.
Default: False
tqdm : tqdm.tqdm
tqdm used for progress bar.
Returns
-------
tbl : pyarrow.Table
The query result. If return_iterator is True, returns an Iterator yielding
Arrow Tables
"""
options = flight.FlightCallOptions(
headers=[auth_header],
)
flight_desc = flight.FlightDescriptor.for_command(command=query)
flight_info = client.get_flight_info(flight_desc, options)
# Retrieve the result set as a stream of Arrow record batches.
reader = client.do_get(flight_info.endpoints[0].ticket, options)
def _generator(_return_table=False):
pbar = None if tqdm is None else tqdm(desc="Flight Transfer Chunk")
while True:
if pbar is not None:
pbar.update()
try:
batch, _ = reader.read_chunk()
if _return_table:
batch = pa.Table.from_batches([batch])
yield batch
except StopIteration:
break
batches = _generator(_return_table=return_iterator)
if not return_iterator:
t = time.time()
data = pa.Table.from_batches(batches)
logger.debug(f"Loaded query {query} via flight in {time.time()-t:.2f}s.")
return data
else:
return batches
def read_query_pandas(
client: flight.FlightClient,
auth_header: tuple,
query,
pandas_options: dict = None,
return_iterator: bool = False,
tqdm=None,
):
"""Load Dremio query to Pandas DataFrame using Flight
Parameters
----------
client: flight.FlightClient
The flight client to be used.
auth_header: (str, str)
The bearer token to be used.
query : str
The dremio query to be executed.
additional_headers : list, optional
Additional headers to be passed in the ``flight.FlightCallOptions``.
pandas_options : dict , optional
Additional kwargs passed to ``pyarrow._flight.FlightStreamReader.read_pandas``.
return_iterator : bool, optional
If True, an iterator yielding DataFrames is returned instead of the
whole DataFrame.
tqdm : tqdm.tqdm
tqdm used for progress bar.
Returns
-------
df : pandas.DataFrame
The query result. If ``return_iterator`` is True, returns an Iterator yielding
pandas DataFrames.
"""
pandas_options = pandas_options or {}
r = read_query_arrow(
query=query,
auth_header=auth_header,
client=client,
return_iterator=return_iterator,
tqdm=tqdm,
)
if return_iterator:
return (df.to_pandas(**pandas_options) for df in r)
else:
return r.to_pandas(**pandas_options)
if __name__ == "__main__":
client, auth_header = initialize_flight(
flight_uri="grpc+tls://dremio-internal-sql.data-platform-dev.stackit.run:32010",
username="Christian.Thiel_ext@external.mail.schwarz",
password="xxxx",
)
data = read_query_pandas(
client,
auth_header,
query="SELECT 1",
)
print(data)