import time import pyarrow as pa from pyarrow import flight import logging logger = logging.getLogger(__file__) def initialize_flight(flight_uri: str, username, password): """Initialize flight connection for a configuration""" client = flight.FlightClient(flight_uri) headers = [] initial_options = flight.FlightCallOptions(headers=headers) auth_header = client.authenticate_basic_token(username, password, initial_options) return client, auth_header def read_query_arrow( client: flight.FlightClient, auth_header: tuple, query: str, return_iterator: bool = False, tqdm=None, ): """Load Dremio query to Arrow Table using Flight Parameters ---------- client: flight.FlightClient The flight client to be used. auth_header: (str, str) The bearer token to be used. query : str The dremio query to be executed. return_iterator : bool, optional If True, an iterator yielding Arrow Tables is returned instead of the whole DataFrame. return_only_info : bool, optional If True, only the flight info is returned. This is useful if you want to get the schema of a query without actually loading the data. Default: False tqdm : tqdm.tqdm tqdm used for progress bar. Returns ------- tbl : pyarrow.Table The query result. If return_iterator is True, returns an Iterator yielding Arrow Tables """ options = flight.FlightCallOptions( headers=[auth_header], ) flight_desc = flight.FlightDescriptor.for_command(command=query) flight_info = client.get_flight_info(flight_desc, options) # Retrieve the result set as a stream of Arrow record batches. reader = client.do_get(flight_info.endpoints[0].ticket, options) def _generator(_return_table=False): pbar = None if tqdm is None else tqdm(desc="Flight Transfer Chunk") while True: if pbar is not None: pbar.update() try: batch, _ = reader.read_chunk() if _return_table: batch = pa.Table.from_batches([batch]) yield batch except StopIteration: break batches = _generator(_return_table=return_iterator) if not return_iterator: t = time.time() data = pa.Table.from_batches(batches) logger.debug(f"Loaded query {query} via flight in {time.time()-t:.2f}s.") return data else: return batches def read_query_pandas( client: flight.FlightClient, auth_header: tuple, query, pandas_options: dict = None, return_iterator: bool = False, tqdm=None, ): """Load Dremio query to Pandas DataFrame using Flight Parameters ---------- client: flight.FlightClient The flight client to be used. auth_header: (str, str) The bearer token to be used. query : str The dremio query to be executed. additional_headers : list, optional Additional headers to be passed in the ``flight.FlightCallOptions``. pandas_options : dict , optional Additional kwargs passed to ``pyarrow._flight.FlightStreamReader.read_pandas``. return_iterator : bool, optional If True, an iterator yielding DataFrames is returned instead of the whole DataFrame. tqdm : tqdm.tqdm tqdm used for progress bar. Returns ------- df : pandas.DataFrame The query result. If ``return_iterator`` is True, returns an Iterator yielding pandas DataFrames. """ pandas_options = pandas_options or {} r = read_query_arrow( query=query, auth_header=auth_header, client=client, return_iterator=return_iterator, tqdm=tqdm, ) if return_iterator: return (df.to_pandas(**pandas_options) for df in r) else: return r.to_pandas(**pandas_options) if __name__ == "__main__": client, auth_header = initialize_flight( flight_uri="grpc+tls://dremio-internal-sql.data-platform-dev.stackit.run:32010", username="Christian.Thiel_ext@external.mail.schwarz", password="xxxx", ) data = read_query_pandas( client, auth_header, query="SELECT 1", ) print(data)