TDengine/docs/examples/python/kafka_example_consumer.py
Linhe Huo e273a943ea
fix: add show connMode string (#30323)
* fix: add show connMode string

* fix: add stdbool.h to pub.h

* fix: remove trash file army/output.txt

* fix: caseBase.py modify syntax error

* fix: restore -R option for taosdump

* fix: taosdumpCommandline.py case

* fix: native stmt write normal table failed

* fix: taosdumpCommandline.py case passed

* fix: restore test.py from main branch

* fix: taosCli.py check default conn mode

* fix: commandline-sml.py case pass

* fix: websiteCase.py case passed

* fix: connMode.py case

* fix: modify default port is 0

* fix: taos_options with config dir not work

* fix: websocket.py delete -D timeout options

* fix: default_tmq_json.py context move to default_json.py, so delete

* fix python kafka bug

* chore: improve taos_init in wrapper

* chore: add installation path preparation in build workflow

* fix connMode bug

* fix: fix tmq conf/consumer new error in wrapperFunc.c

* fix: correct the spelling toss -> taosGetInstall...

* chore: fix compile error in wrapperFunc.c

* fix: createConnect fix memory leak

* fix: tsim forbid CHECK  ODR

* modify userOperTest uuse static lib

* reverse userOperTest use static lib

---------

Co-authored-by: Alex Duan <417921451@qq.com>
Co-authored-by: taos-support <it@taosdata.com>
Co-authored-by: “chris <“zk662144@163.com”>
Co-authored-by: t_max <1172915550@qq.com>
Co-authored-by: sheyanjie-qq <249478495@qq.com>
2025-03-22 20:44:07 +08:00

231 lines
9 KiB
Python

#! encoding = utf-8
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, Future
from json import JSONDecodeError
from typing import Callable
import taos
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
import kafka_example_common as common
class Consumer(object):
DEFAULT_CONFIGS = {
'kafka_brokers': 'localhost:9092', # kafka broker
'kafka_topic': 'tdengine_kafka_practices',
'kafka_group_id': 'taos',
'taos_host': 'localhost', # TDengine host
'taos_port': 6030, # TDengine port
'taos_user': 'root', # TDengine user name
'taos_password': 'taosdata', # TDengine password
'taos_database': 'power', # TDengine database
'message_type': 'json', # message format, 'json' or 'line'
'clean_after_testing': False, # if drop database after testing
'max_poll': 1000, # poll size for batch mode
'workers': 10, # thread count for multi-threading
'testing': False
}
INSERT_SQL_HEADER = "insert into "
INSERT_PART_SQL = '{} values (\'{}\', {}, {}, {})'
def __init__(self, **configs):
self.config = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = None
if not self.config.get('testing'):
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'),
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.conns = taos.connect(
host=self.config.get('taos_host'),
port=self.config.get('taos_port'),
user=self.config.get('taos_user'),
password=self.config.get('taos_password'),
db=self.config.get('taos_database'),
)
if self.config.get('workers') > 1:
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks = []
# tags and table mapping # key: {location}_{groupId} value:
def consume(self):
"""
consume data from kafka and deal. Base on `message_type`, `bath_consume`, `insert_by_table`,
there are several deal function.
:return:
"""
self.conns.execute(common.USE_DATABASE_SQL.format(self.config.get('taos_database')))
try:
if self.config.get('message_type') == 'line': # line
self._run(self._line_to_taos)
if self.config.get('message_type') == 'json': # json
self._run(self._json_to_taos)
except KeyboardInterrupt:
logging.warning("## caught keyboard interrupt, stopping")
finally:
self.stop()
def stop(self):
"""
stop consuming
:return:
"""
# close consumer
if self.consumer is not None:
self.consumer.commit()
self.consumer.close()
# multi thread
if self.config.get('workers') > 1:
if self.pool is not None:
self.pool.shutdown()
for task in self.tasks:
while not task.done():
time.sleep(0.01)
# clean data
if self.config.get('clean_after_testing'):
self.conns.execute(common.DROP_TABLE_SQL)
self.conns.execute(common.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
# close taos
if self.conns is not None:
self.conns.close()
def _run(self, f):
"""
run in batch consuming mode
:param f:
:return:
"""
i = 0 # just for test.
while True:
messages = self.consumer.poll(timeout_ms=100, max_records=self.config.get('max_poll'))
if messages:
if self.config.get('workers') > 1:
self.pool.submit(f, messages.values())
else:
f(list(messages.values()))
if not messages:
i += 1 # just for test.
time.sleep(0.1)
if i > 3: # just for test.
logging.warning('## test over.') # just for test.
return # just for test.
def _json_to_taos(self, messages):
"""
convert a batch of json data to sql, and insert into TDengine
:param messages:
:return:
"""
sql = self._build_sql_from_json(messages=messages)
self.conns.execute(sql=sql)
def _line_to_taos(self, messages):
"""
convert a batch of lines data to sql, and insert into TDengine
:param messages:
:return:
"""
lines = []
for partition_messages in messages:
for message in partition_messages:
lines.append(message.value.decode())
sql = self.INSERT_SQL_HEADER + ' '.join(lines)
self.conns.execute(sql=sql)
def _build_single_sql_from_json(self, msg_value):
try:
data = json.loads(msg_value)
except JSONDecodeError as e:
logging.error('## decode message [%s] error ', msg_value, e)
return ''
# location = data.get('location')
# group_id = data.get('groupId')
ts = data.get('ts')
current = data.get('current')
voltage = data.get('voltage')
phase = data.get('phase')
table_name = data.get('table_name')
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_from_json(self, messages):
sql_list = []
for partition_messages in messages:
for message in partition_messages:
sql_list.append(self._build_single_sql_from_json(message.value))
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
def test_json_to_taos(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'table_name': 'd0',
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None, leader_epoch=0),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'table_name': 'd1',
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None,leader_epoch=0 ),
]
]
consumer._json_to_taos(messages=records)
def test_line_to_taos(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value="d0 values('2023-01-01 00:00:00.001', 3.49, 109, 0.02737)".encode('utf-8'),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None,leader_epoch=0 ),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value="d1 values('2023-01-01 00:00:00.002', 6.19, 112, 0.09171)".encode('utf-8'),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None,leader_epoch=0 ),
]
]
consumer._line_to_taos(messages=records)
def consume(kafka_brokers, kafka_topic, kafka_group_id, taos_host, taos_port, taos_user,
taos_password, taos_database, message_type, max_poll, workers):
c = Consumer(kafka_brokers=kafka_brokers, kafka_topic=kafka_topic, kafka_group_id=kafka_group_id,
taos_host=taos_host, taos_port=taos_port, taos_user=taos_user, taos_password=taos_password,
taos_database=taos_database, message_type=message_type, max_poll=max_poll, workers=workers)
c.consume()
if __name__ == '__main__':
consumer = Consumer(testing=True)
common.create_database_and_tables(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test',
table_count=10)
consumer.conns.execute(common.USE_DATABASE_SQL.format('py_kafka_test'))
test_json_to_taos(consumer)
test_line_to_taos(consumer)
common.clean(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test')