TDengine/docs/examples/python/conn_native_pandas.py

115 lines
4.3 KiB
Python
Raw Permalink Normal View History

# ANCHOR: connect
from datetime import datetime
import pandas
from sqlalchemy import create_engine, text
from sqlalchemy.types import Integer, Float, TIMESTAMP, String
def connect():
"""Create a connection to TDengine using SQLAlchemy"""
engine = create_engine(f"taos://root:taosdata@localhost:6030?timezone=Asia/Shanghai")
conn = engine.connect()
print("Connected to TDengine successfully.")
return conn
# ANCHOR_END: connect
# ANCHOR: pandas_to_sql_example
def pandas_to_sql_example(conn):
"""Test writing data to TDengine using pandas DataFrame.to_sql() method and verify the results"""
try:
conn.execute(text("CREATE DATABASE IF NOT EXISTS power"))
conn.execute(text(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"))
conn.execute(text("USE power"))
data = {
"ts": [1729653691000, "2024-09-19 10:00:00", datetime(2024, 9, 20, 10, 11, 12, 456)],
"current": [11.5, 12.3, 13.7],
"voltage": [220, 230, 240],
"phase": [1.0, 1.1, 1.2],
"location": ["california.losangeles", "california.sandiego", "california.sanfrancisco"],
"groupid": [2, 2, 3],
"tbname": ["california", "sandiego", "sanfrancisco"]
}
df = pandas.DataFrame(data)
rows_affected = df.to_sql("meters", conn, if_exists="append", index=False,
dtype={
"ts": TIMESTAMP,
"current": Float,
"voltage": Integer,
"phase": Float,
"location": String,
"groupid": Integer,
})
assert rows_affected == 3, f"Expected to insert 3 rows, affected {rows_affected} rows"
except Exception as err:
print(f"Failed to insert data into power.meters, ErrMessage:{err}")
raise err
# ANCHOR_END: pandas_to_sql_example
# ANCHOR: pandas_read_sql_example
def pandas_read_sql_example(conn):
"""Test reading data from TDengine using pandas read_sql() method"""
try:
sql = text("SELECT * FROM power.meters WHERE current > :current AND phase > :phase")
sql_df = pandas.read_sql(
sql=sql,
con=conn,
params={"current": 10, "phase": 1}
)
print(sql_df.head(3))
print("Read data from TDengine successfully.")
except Exception as err:
print(f"Failed to read data from power.meters, ErrMessage:{err}")
raise err
# ANCHOR_END: pandas_read_sql_example
# ANCHOR: pandas_read_sql_table_example
def pandas_read_sql_table_example(conn):
"""Test reading data from TDengine using pandas read_sql_table() method"""
try:
table_df = pandas.read_sql_table(
table_name='meters',
con=conn,
index_col='ts',
parse_dates=['ts'],
chunksize=1000, # optional, read data rows in chunks
columns=[
'ts',
'current',
'voltage',
'phase',
'location',
'groupid'
],
)
total_rows = 0
for i, chunk in enumerate(table_df, start=1):
print(f"Processing chunk {i}")
for index, row in chunk.iterrows():
total_rows += 1
print(f"no: {total_rows}")
print(f"ts: {index}")
print(f"current: {row['current']}")
print(f"voltage: {row['voltage']}")
print(f"phase: {row['phase']}")
print(f"location: {row['location']}")
print(f"groupid: {row['groupid']}")
print("Read data from TDengine successfully using read_sql_table.")
except Exception as err:
print(f"Failed to read data from power.meters using read_sql_table, ErrMessage:{err}")
raise err
# ANCHOR_END: pandas_read_sql_table_example
if __name__ == "__main__":
conn = None
try:
conn = connect()
pandas_to_sql_example(conn)
pandas_read_sql_example(conn)
pandas_read_sql_table_example(conn)
finally:
if conn:
conn.close()