2024-08-02 08:20:50 +00:00
const taos = require ( "@tdengine/websocket" ) ;
const db = 'power' ;
const stable = 'meters' ;
2024-08-17 09:40:44 +00:00
const topic = 'topic_meters'
const topics = [ topic ] ;
const groupId = "group1" ;
const clientId = "client1" ;
2024-08-02 08:20:50 +00:00
// ANCHOR: create_consumer
async function createConsumer ( ) {
let configMap = new Map ( [
[ taos . TMQConstants . GROUP _ID , "group1" ] ,
[ taos . TMQConstants . CLIENT _ID , 'client1' ] ,
[ taos . TMQConstants . CONNECT _USER , "root" ] ,
[ taos . TMQConstants . CONNECT _PASS , "taosdata" ] ,
[ taos . TMQConstants . AUTO _OFFSET _RESET , "latest" ] ,
[ taos . TMQConstants . WS _URL , 'ws://localhost:6041' ] ,
[ taos . TMQConstants . ENABLE _AUTO _COMMIT , 'true' ] ,
[ taos . TMQConstants . AUTO _COMMIT _INTERVAL _MS , '1000' ]
] ) ;
try {
return await taos . tmqConnect ( configMap ) ;
2024-08-12 12:56:20 +00:00
} catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( err ) ;
2024-08-02 08:20:50 +00:00
throw err ;
}
2024-08-12 12:56:20 +00:00
2024-08-02 08:20:50 +00:00
}
// ANCHOR_END: create_consumer
async function prepare ( ) {
let conf = new taos . WSConfig ( 'ws://localhost:6041' ) ;
conf . setUser ( 'root' ) ;
conf . setPwd ( 'taosdata' ) ;
conf . setDb ( 'power' ) ;
2024-08-15 05:37:06 +00:00
const createDB = ` CREATE DATABASE IF NOT EXISTS ${ db } KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1; ` ;
2024-08-02 08:20:50 +00:00
const createStable = ` CREATE STABLE IF NOT EXISTS ${ db } . ${ stable } (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); ` ;
2024-08-12 12:56:20 +00:00
2024-08-02 08:20:50 +00:00
let wsSql = await taos . sqlConnect ( conf ) ;
await wsSql . exec ( createDB ) ;
await wsSql . exec ( createStable ) ;
let createTopic = ` CREATE TOPIC IF NOT EXISTS ${ topics [ 0 ] } AS SELECT * FROM ${ db } . ${ stable } ` ;
await wsSql . exec ( createTopic ) ;
for ( let i = 0 ; i < 10 ; i ++ ) {
await wsSql . exec ( ` INSERT INTO d1001 USING ${ stable } (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${ 10 + i } , ${ 200 + i } , ${ 0.32 + i } ) ` ) ;
}
2024-08-15 05:37:06 +00:00
await wsSql . close ( ) ;
2024-08-02 08:20:50 +00:00
}
// ANCHOR: subscribe
async function subscribe ( consumer ) {
try {
await consumer . subscribe ( [ 'topic_meters' ] ) ;
for ( let i = 0 ; i < 50 ; i ++ ) {
let res = await consumer . poll ( 100 ) ;
for ( let [ key , value ] of res ) {
2024-08-17 09:40:44 +00:00
// Add your data processing logic here
2024-08-12 14:13:57 +00:00
console . log ( ` data: ${ key } ${ value } ` ) ;
2024-08-02 08:20:50 +00:00
}
2024-08-12 12:56:20 +00:00
}
} catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( ` Failed to poll data, topic: ${ topic } , groupId: ${ groupId } , clientId: ${ clientId } , ErrCode: ${ err . code } , ErrMessage: ${ err . message } ` ) ;
2024-08-02 08:20:50 +00:00
throw err ;
}
}
// ANCHOR_END: subscribe
// ANCHOR: offset
async function test ( ) {
let consumer = null ;
try {
await prepare ( ) ;
let consumer = await createConsumer ( )
await consumer . subscribe ( [ 'topic_meters' ] ) ;
let res = new Map ( ) ;
while ( res . size == 0 ) {
res = await consumer . poll ( 100 ) ;
2024-08-12 12:56:20 +00:00
}
2024-08-02 08:20:50 +00:00
let assignment = await consumer . assignment ( ) ;
await consumer . seekToBeginning ( assignment ) ;
2024-08-12 12:28:31 +00:00
console . log ( "Assignment seek to beginning successfully" ) ;
2024-08-02 08:20:50 +00:00
}
catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( ` Failed to seek offset, topic: ${ topic } , groupId: ${ groupId } , clientId: ${ clientId } , ErrCode: ${ err . code } , ErrMessage: ${ err . message } ` ) ;
2024-08-02 08:20:50 +00:00
}
finally {
if ( consumer ) {
await consumer . close ( ) ;
}
taos . destroy ( ) ;
}
}
// ANCHOR_END: offset
test ( )