# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import datetime
import pickle
import uuid
from typing import List, Union
import cloudpickle
import numpy as np
import pandas as pd
from ... import opcodes as OperandDef
from ...config import options
from ...core.context import Context
from ...core.operand import OperatorLogicKeyGeneratorMixin
from ...serialization.serializables import (
AnyField,
BoolField,
BytesField,
Float64Field,
Int64Field,
ListField,
StringField,
)
from ...tensor.utils import normalize_chunk_sizes
from ...typing import OperandType, TileableType
from ..utils import arrow_dtype_kwargs, create_sa_connection, is_pandas_2, parse_index
from .core import (
ColumnPruneSupportedDataSourceMixin,
IncrementalIndexDatasource,
IncrementalIndexDataSourceMixin,
)
class DataFrameReadSQLLogicKeyGenerator(OperatorLogicKeyGeneratorMixin):
def _get_logic_key_token_values(self):
fields_to_tokenize = [
getattr(self, k, None)
for k in [
"table_or_sql",
"schema",
"coerce_float",
"parse_dates",
"columns",
"method",
"incremental_index",
"use_arrow_dtype",
"partition_col",
]
]
return super()._get_logic_key_token_values() + fields_to_tokenize
class DataFrameReadSQL(
IncrementalIndexDatasource,
ColumnPruneSupportedDataSourceMixin,
IncrementalIndexDataSourceMixin,
DataFrameReadSQLLogicKeyGenerator,
):
_op_type_ = OperandDef.READ_SQL
table_or_sql = AnyField("table_or_sql")
selectable = BytesField(
"selectable", on_serialize=pickle.dumps, on_deserialize=pickle.loads
)
con = AnyField("con")
schema = StringField("schema")
index_col = AnyField("index_col")
coerce_float = BoolField("coerce_float")
parse_dates = AnyField("parse_dates")
columns = ListField("columns")
engine_kwargs = BytesField(
"engine_kwargs",
on_serialize=cloudpickle.dumps,
on_deserialize=cloudpickle.loads,
)
row_memory_usage = Float64Field("row_memory_usage")
method = StringField("method")
incremental_index = BoolField("incremental_index")
use_arrow_dtype = BoolField("use_arrow_dtype")
chunk_size = AnyField("chunk_size")
# for chunks
offset = Int64Field("offset")
partition_col = StringField("partition_col")
num_partitions = Int64Field("num_partitions")
low_limit = AnyField("low_limit")
high_limit = AnyField("high_limit")
left_end = BoolField("left_end")
right_end = BoolField("right_end")
nrows = Int64Field("nrows", default=None)
def get_columns(self):
return self.columns
def set_pruned_columns(self, columns, *, keep_order=None):
self.columns = columns
def _get_selectable(self, engine_or_conn, columns=None):
import sqlalchemy as sa
from sqlalchemy import sql
from sqlalchemy.exc import SQLAlchemyError
# process table_name
if self.selectable is not None:
selectable = self.selectable
else:
if isinstance(self.table_or_sql, sa.Table):
selectable = self.table_or_sql
self.table_or_sql = selectable.name
else:
m = sa.MetaData()
try:
selectable = sa.Table(
self.table_or_sql,
m,
autoload_replace=True,
autoload_with=engine_or_conn,
schema=self.schema,
)
except SQLAlchemyError:
temp_name_1 = "t1_" + binascii.b2a_hex(uuid.uuid4().bytes).decode()
temp_name_2 = "t2_" + binascii.b2a_hex(uuid.uuid4().bytes).decode()
if columns:
selectable = (
sql.text(self.table_or_sql)
.columns(*[sql.column(c) for c in columns])
.alias(temp_name_2)
)
else:
selectable = (
sql.select("*")
.select_from(
sql.text(f"({self.table_or_sql}) AS {temp_name_1}")
)
.alias(temp_name_2)
)
self.selectable = selectable
return selectable
def _collect_info(
self, engine_or_conn, selectable, columns, test_rows, use_arrow_dtype
):
from sqlalchemy import sql
# fetch test DataFrame
if columns:
query = (
sql.select(*[sql.column(c) for c in columns])
.select_from(selectable)
.limit(test_rows)
)
else:
query = (
sql.select(selectable.columns).select_from(selectable).limit(test_rows)
)
# read_sql in pandas 1.5 does not support pyarrow.
sql_kwargs = arrow_dtype_kwargs() if use_arrow_dtype and is_pandas_2() else {}
test_df = pd.read_sql(
query,
engine_or_conn,
index_col=self.index_col,
coerce_float=self.coerce_float,
parse_dates=self.parse_dates,
**sql_kwargs,
)
if len(test_df) == 0:
self.row_memory_usage = None
else:
self.row_memory_usage = test_df.memory_usage(
deep=True, index=True
).sum() / len(test_df)
if self.method == "offset":
# fetch size
size = list(
engine_or_conn.execute(
sql.select(sql.func.count()).select_from(selectable)
)
)[0][0]
shape = (size, test_df.shape[1])
else:
shape = (np.nan, test_df.shape[1])
return test_df, shape
def __call__(self, test_rows, chunk_size):
import sqlalchemy as sa
from sqlalchemy.sql import elements
with create_sa_connection(self.con, **(self.engine_kwargs or dict())) as con:
self.con = con.engine.url.render_as_string(hide_password=False)
selectable = self._get_selectable(con)
# process index_col
index_col = self.index_col
if index_col is not None:
if not isinstance(index_col, (list, tuple)):
index_col = (index_col,)
new_index_col = []
for col in index_col:
if isinstance(col, (sa.Column, elements.Label)):
new_index_col.append(col.name)
elif isinstance(col, str):
new_index_col.append(col)
elif col is not None:
raise TypeError(f"unknown index_col type: {type(col)}")
self.index_col = new_index_col
# process columns
columns = self.columns or []
new_columns = []
for col in columns:
if isinstance(col, str):
new_columns.append(col)
else:
new_columns.append(col.name)
self.columns = new_columns
if self.columns:
collect_cols = self.columns + (self.index_col or [])
else:
collect_cols = []
use_arrow_dtype = self.use_arrow_dtype
if use_arrow_dtype is None:
use_arrow_dtype = options.dataframe.use_arrow_dtype
test_df, shape = self._collect_info(
con, selectable, collect_cols, test_rows, use_arrow_dtype
)
# reconstruct selectable using known column names
if not collect_cols:
self.columns = list(test_df.columns)
if self.selectable is not None:
self.selectable = None
self._get_selectable(
con, columns=self.columns + (self.index_col or [])
)
if self.method == "partition":
if not self.index_col or self.partition_col not in self.index_col:
part_frame = test_df
else:
part_frame = test_df.index.to_frame()
if not issubclass(
part_frame[self.partition_col].dtype.type,
(np.number, np.datetime64),
):
raise TypeError(
"Type of partition column should be numeric or datetime, "
f"now it is {test_df[self.partition_col].dtype}"
)
if isinstance(test_df.index, pd.RangeIndex):
index_value = parse_index(
pd.RangeIndex(shape[0] if not np.isnan(shape[0]) else -1),
str(selectable),
self.con,
)
else:
index_value = parse_index(test_df.index)
columns_value = parse_index(test_df.columns, store_data=True)
dtypes = test_df.dtypes
return self.new_dataframe(
None,
shape=shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
raw_chunk_size=chunk_size,
)
@classmethod
def _tile_offset(cls, op: "DataFrameReadSQL"):
df = op.outputs[0]
if op.row_memory_usage is not None:
# Data selected
chunk_size = df.extra_params.raw_chunk_size or options.chunk_size
if chunk_size is None:
chunk_size = (
int(options.chunk_store_limit / op.row_memory_usage),
df.shape[1],
)
row_chunk_sizes = normalize_chunk_sizes(df.shape, chunk_size)[0]
else:
# No data selected
row_chunk_sizes = (0,)
offsets = np.cumsum((0,) + row_chunk_sizes).tolist()
out_chunks = []
for i, row_size in enumerate(row_chunk_sizes):
chunk_op = op.copy().reset_key()
chunk_op._row_memory_usage = None # no need for chunk
offset = chunk_op.offset = offsets[i]
if df.index_value.has_value():
# range index
index_value = parse_index(
df.index_value.to_pandas()[offset : offsets[i + 1]]
)
else:
index_value = parse_index(
df.index_value.to_pandas(),
op.table_or_sql or str(op.selectable),
op.con,
i,
row_size,
)
out_chunk = chunk_op.new_chunk(
None,
shape=(row_size, df.shape[1]),
columns_value=df.columns_value,
index_value=index_value,
dtypes=df.dtypes,
index=(i, 0),
)
out_chunks.append(out_chunk)
nsplits = (row_chunk_sizes, (df.shape[1],))
new_op = op.copy()
return new_op.new_dataframes(
None, chunks=out_chunks, nsplits=nsplits, **df.params
)
def _parse_datetime(self, val):
if isinstance(self.parse_dates, list):
return pd.to_datetime(val)
args = self.parse_dates[self.partition_col]
args = {"format": args} if isinstance(args, str) else args
return pd.to_datetime(val, **args)
@classmethod
def _tile_partition(cls, op: "DataFrameReadSQL"):
df = op.outputs[0]
selectable = op._get_selectable(None)
if op.low_limit is None or op.high_limit is None:
import sqlalchemy as sa
from sqlalchemy import sql
engine = sa.create_engine(op.con, **(op.engine_kwargs or dict()))
try:
part_col = selectable.columns[op.partition_col]
with engine.connect() as connection:
range_results = connection.execute(
sql.select(*[sql.func.min(part_col), sql.func.max(part_col)])
)
op.low_limit, op.high_limit = next(range_results)
if op.parse_dates and op.partition_col in op.parse_dates:
op.low_limit = op._parse_datetime(op.low_limit)
op.high_limit = op._parse_datetime(op.high_limit)
finally:
engine.dispose()
if isinstance(op.low_limit, (datetime.datetime, np.datetime64, pd.Timestamp)):
seps = pd.date_range(op.low_limit, op.high_limit, op.num_partitions + 1)
else:
seps = np.linspace(
op.low_limit, op.high_limit, op.num_partitions + 1, endpoint=True
)
out_chunks = []
for i, (start, end) in enumerate(zip(seps, seps[1:])):
chunk_op = op.copy().reset_key()
chunk_op.row_memory_usage = None # no need for chunk
chunk_op.num_partitions = None
chunk_op.low_limit = start
chunk_op.high_limit = end
chunk_op.left_end = i == 0
chunk_op.right_end = i == op.num_partitions - 1
if df.index_value.has_value():
# range index
index_value = parse_index(-1, chunk_op.key, chunk_op.index_value.key)
else:
index_value = parse_index(
df.index_value.to_pandas(), str(selectable), op.con, i
)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, df.shape[1]),
columns_value=df.columns_value,
index_value=index_value,
dtypes=df.dtypes,
index=(i, 0),
)
out_chunks.append(out_chunk)
nsplits = ((np.nan,) * len(out_chunks), (df.shape[1],))
new_op = op.copy()
return new_op.new_dataframes(
None, chunks=out_chunks, nsplits=nsplits, **df.params
)
@classmethod
def tile(cls, op: "DataFrameReadSQL"):
if op.method == "offset":
return cls._tile_offset(op)
else:
return cls._tile_partition(op)
@classmethod
def post_tile(cls, op: OperandType, results: List[TileableType]):
if op.method != "offset":
# method `offset` knows shape of each chunk
# just skip incremental process
return super().post_tile(op, results)
@classmethod
def execute(cls, ctx, op: "DataFrameReadSQL"):
import sqlalchemy as sa
def _adapt_datetime(dt):
if isinstance(dt, np.datetime64):
return dt.astype("<M8[ms]").astype(datetime.datetime)
elif isinstance(dt, pd.Timestamp):
return dt.to_pydatetime()
return dt
out = op.outputs[0]
engine = sa.create_engine(op.con, **(op.engine_kwargs or dict()))
try:
selectable = op._get_selectable(engine)
columns = [selectable.columns[col] for col in op.columns]
column_names = set(op.columns)
if op.index_col:
for icol in op.index_col:
if icol not in column_names:
columns.append(selectable.columns[icol])
# convert to python timestamp in case np / pd time types not handled
op.low_limit = _adapt_datetime(op.low_limit)
op.high_limit = _adapt_datetime(op.high_limit)
query = sa.sql.select(*columns)
if op.method == "partition":
part_col = selectable.columns[op.partition_col]
if op.left_end:
query = query.where(part_col < op.high_limit)
elif op.right_end:
query = query.where(part_col >= op.low_limit)
else:
query = query.where(
(part_col >= op.low_limit) & (part_col < op.high_limit)
)
if hasattr(selectable, "primary_key") and len(selectable.primary_key) > 0:
# if table has primary key, sort as the order
query = query.order_by(*list(selectable.primary_key))
elif op.index_col:
# if no primary key, sort as the index_col
query = query.order_by(
*[selectable.columns[col] for col in op.index_col]
)
else:
# at last, we sort by all the columns
query = query.order_by(*columns)
if op.method == "offset":
query = query.limit(out.shape[0])
if op.offset > 0:
query = query.offset(op.offset)
if op.nrows is not None:
query = query.limit(op.nrows)
use_arrow_dtype = op.use_arrow_dtype
if use_arrow_dtype is None:
use_arrow_dtype = options.dataframe.use_arrow_dtype
# read_sql in pandas 1.5 does not support pyarrow.
sql_kwargs = (
arrow_dtype_kwargs() if use_arrow_dtype and is_pandas_2() else {}
)
try:
df = pd.read_sql(
query,
engine,
index_col=op.index_col,
coerce_float=op.coerce_float,
parse_dates=op.parse_dates,
**sql_kwargs,
)
except AttributeError as e:
if "OptionEngine" in str(e):
import sqlalchemy as sa
raise AttributeError(
f"Your SQLAlchemy {sa.__version__} is too new for pandas {pd.__version__}"
) from e
raise e
if op.method == "offset" and op.index_col is None and op.offset > 0:
index = pd.RangeIndex(op.offset, op.offset + out.shape[0])
if op.nrows is not None:
index = index[: op.nrows]
df.index = index
if out.ndim == 2:
ctx[out.key] = df
else:
# this happens when column pruning results in one single series
ctx[out.key] = df.iloc[:, 0]
finally:
engine.dispose()
@classmethod
def post_execute(cls, ctx: Union[dict, Context], op: OperandType):
if op.method != "offset":
# method `offset` knows shape of each chunk
# just skip incremental process
return super().post_execute(ctx, op)
def _read_sql(
table_or_sql,
con,
schema=None,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
columns=None,
chunksize=None,
incremental_index=False,
use_arrow_dtype=None,
test_rows=None,
chunk_size=None,
engine_kwargs=None,
partition_col=None,
num_partitions=None,
low_limit=None,
high_limit=None,
):
if chunksize is not None:
raise NotImplementedError("read_sql_query with chunksize not supported")
method = "offset" if partition_col is None else "partition"
op = DataFrameReadSQL(
table_or_sql=table_or_sql,
selectable=None,
con=con,
schema=schema,
index_col=index_col,
coerce_float=coerce_float,
params=params,
parse_dates=parse_dates,
columns=columns,
engine_kwargs=engine_kwargs,
incremental_index=incremental_index,
use_arrow_dtype=use_arrow_dtype,
method=method,
partition_col=partition_col,
num_partitions=num_partitions,
low_limit=low_limit,
high_limit=high_limit,
chunk_size=chunk_size,
)
return op(test_rows, chunk_size)
[docs]def read_sql(
sql,
con,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
columns=None,
chunksize=None,
test_rows=5,
chunk_size=None,
engine_kwargs=None,
incremental_index=True,
partition_col=None,
num_partitions=None,
low_limit=None,
high_limit=None,
):
"""
Read SQL query or database table into a DataFrame.
This function is a convenience wrapper around ``read_sql_table`` and
``read_sql_query`` (for backward compatibility). It will delegate
to the specific function depending on the provided input. A SQL query
will be routed to ``read_sql_query``, while a database table name will
be routed to ``read_sql_table``. Note that the delegated function might
have more specific notes about their functionality not listed here.
Parameters
----------
sql : str or SQLAlchemy Selectable (select or text object)
SQL query to be executed or a table name.
con : SQLAlchemy connectable (engine/connection) or database str URI
or DBAPI2 connection (fallback mode)'
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy connectable. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_
index_col : str or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : bool, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used
to pass parameters is database driver dependent. Check your
database driver documentation for which of the five syntax styles,
described in PEP 249's paramstyle, is supported.
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite.
columns : list, default: None
List of column names to select from SQL table (only used when reading
a table).
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number of rows
to include in each chunk. Note that this argument is only kept for
compatibility. If a non-none value passed, an error will be reported.
test_rows: int, default 5
The number of rows to fetch for inferring dtypes.
chunk_size: : int or tuple of ints, optional
Specifies chunk size for each dimension.
engine_kwargs: dict, default None
Extra kwargs to pass to sqlalchemy.create_engine
incremental_index: bool, default True
If index_col not specified, ensure range index incremental,
gain a slightly better performance if setting False.
partition_col : str, default None
Specify name of the column to split the result of the query. If
specified, the range ``[low_limit, high_limit]`` will be divided
into ``n_partitions`` chunks with equal lengths. We do not
guarantee the sizes of chunks be equal. When the value is None,
``OFFSET`` and ``LIMIT`` clauses will be used to cut the result
of the query.
num_partitions : int, default None
The number of chunks to divide the result of the query into,
when ``partition_col`` is specified.
low_limit : default None
The lower bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the minimum of
the column.
high_limit : default None
The higher bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the maximum of
the column.
Returns
-------
DataFrame
See Also
--------
read_sql_table : Read SQL database table into a DataFrame.
read_sql_query : Read SQL query into a DataFrame.
"""
return _read_sql(
table_or_sql=sql,
con=con,
index_col=index_col,
coerce_float=coerce_float,
params=params,
parse_dates=parse_dates,
columns=columns,
engine_kwargs=engine_kwargs,
incremental_index=incremental_index,
chunksize=chunksize,
test_rows=test_rows,
chunk_size=chunk_size,
partition_col=partition_col,
num_partitions=num_partitions,
low_limit=low_limit,
high_limit=high_limit,
)
[docs]def read_sql_table(
table_name,
con,
schema=None,
index_col=None,
coerce_float=True,
parse_dates=None,
columns=None,
chunksize=None,
test_rows=5,
chunk_size=None,
engine_kwargs=None,
incremental_index=True,
use_arrow_dtype=None,
partition_col=None,
num_partitions=None,
low_limit=None,
high_limit=None,
):
"""
Read SQL database table into a DataFrame.
Given a table name and a SQLAlchemy connectable, returns a DataFrame.
This function does not support DBAPI connections.
Parameters
----------
table_name : str
Name of SQL table in database.
con : SQLAlchemy connectable or str
A database URI could be provided as as str.
SQLite DBAPI connection mode not supported.
schema : str, default None
Name of SQL schema in database to query (if database flavor
supports this). Uses default schema if None (default).
index_col : str or list of str, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : bool, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point. Can result in loss of Precision.
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite.
columns : list, default None
List of column names to select from SQL table.
chunksize : int, default None
If specified, returns an iterator where `chunksize` is the number of
rows to include in each chunk. Note that this argument is only kept
for compatibility. If a non-none value passed, an error will be
reported.
test_rows: int, default 5
The number of rows to fetch for inferring dtypes.
chunk_size: : int or tuple of ints, optional
Specifies chunk size for each dimension.
engine_kwargs: dict, default None
Extra kwargs to pass to sqlalchemy.create_engine
incremental_index: bool, default True
If index_col not specified, ensure range index incremental,
gain a slightly better performance if setting False.
use_arrow_dtype: bool, default None
If True, use arrow dtype to store columns.
partition_col : str, default None
Specify name of the column to split the result of the query. If
specified, the range ``[low_limit, high_limit]`` will be divided
into ``n_partitions`` chunks with equal lengths. We do not
guarantee the sizes of chunks be equal. When the value is None,
``OFFSET`` and ``LIMIT`` clauses will be used to cut the result
of the query.
num_partitions : int, default None
The number of chunks to divide the result of the query into,
when ``partition_col`` is specified.
low_limit : default None
The lower bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the minimum of
the column.
high_limit : default None
The higher bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the maximum of
the column.
Returns
-------
DataFrame
A SQL table is returned as two-dimensional data structure with labeled
axes.
See Also
--------
read_sql_query : Read SQL query into a DataFrame.
read_sql : Read SQL query or database table into a DataFrame.
Notes
-----
Any datetime values with time zone information will be converted to UTC.
Examples
--------
>>> import mars.dataframe as md
>>> md.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP
"""
return _read_sql(
table_or_sql=table_name,
con=con,
schema=schema,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
columns=columns,
engine_kwargs=engine_kwargs,
incremental_index=incremental_index,
use_arrow_dtype=use_arrow_dtype,
chunksize=chunksize,
test_rows=test_rows,
chunk_size=chunk_size,
partition_col=partition_col,
num_partitions=num_partitions,
low_limit=low_limit,
high_limit=high_limit,
)
[docs]def read_sql_query(
sql,
con,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
chunksize=None,
test_rows=5,
chunk_size=None,
engine_kwargs=None,
incremental_index=True,
use_arrow_dtype=None,
partition_col=None,
num_partitions=None,
low_limit=None,
high_limit=None,
):
"""
Read SQL query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query
string. Optionally provide an `index_col` parameter to use one of the
columns as the index, otherwise default integer index will be used.
Parameters
----------
sql : str SQL query or SQLAlchemy Selectable (select or text object)
SQL query to be executed.
con : SQLAlchemy connectable(engine/connection), database str URI,
or sqlite3 DBAPI2 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library.
If a DBAPI2 object, only sqlite3 is supported.
index_col : str or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : bool, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point. Useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used
to pass parameters is database driver dependent. Check your
database driver documentation for which of the five syntax styles,
described in PEP 249's paramstyle, is supported.
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite.
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number of
rows to include in each chunk. Note that this argument is only kept
for compatibility. If a non-none value passed, an error will be
reported.
incremental_index: bool, default True
If index_col not specified, ensure range index incremental,
gain a slightly better performance if setting False.
use_arrow_dtype: bool, default None
If True, use arrow dtype to store columns.
test_rows: int, default 5
The number of rows to fetch for inferring dtypes.
chunk_size: : int or tuple of ints, optional
Specifies chunk size for each dimension.
engine_kwargs: dict, default None
Extra kwargs to pass to sqlalchemy.create_engine
partition_col : str, default None
Specify name of the column to split the result of the query. If
specified, the range ``[low_limit, high_limit]`` will be divided
into ``n_partitions`` chunks with equal lengths. We do not
guarantee the sizes of chunks be equal. When the value is None,
``OFFSET`` and ``LIMIT`` clauses will be used to cut the result
of the query.
num_partitions : int, default None
The number of chunks to divide the result of the query into,
when ``partition_col`` is specified.
low_limit : default None
The lower bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the minimum of
the column.
high_limit : default None
The higher bound of the range of column ``partition_col``. If not
specified, a query will be executed to query the maximum of
the column.
Returns
-------
DataFrame
See Also
--------
read_sql_table : Read SQL database table into a DataFrame.
read_sql
Notes
-----
Any datetime values with time zone information parsed via the `parse_dates`
parameter will be converted to UTC.
"""
return _read_sql(
table_or_sql=sql,
con=con,
index_col=index_col,
coerce_float=coerce_float,
params=params,
parse_dates=parse_dates,
engine_kwargs=engine_kwargs,
incremental_index=incremental_index,
use_arrow_dtype=use_arrow_dtype,
chunksize=chunksize,
test_rows=test_rows,
chunk_size=chunk_size,
partition_col=partition_col,
num_partitions=num_partitions,
low_limit=low_limit,
high_limit=high_limit,
)