2024-07-03 08:33:55 +00:00
const taos = require ( "@tdengine/websocket" ) ;
const db = 'power' ;
const stable = 'meters' ;
const topics = [ 'power_meters_topic' ] ;
// ANCHOR: create_consumer
async function createConsumer ( ) {
let configMap = new Map ( [
2024-08-02 07:14:08 +00:00
[ taos . TMQConstants . GROUP _ID , "group1" ] ,
[ taos . TMQConstants . CLIENT _ID , 'client1' ] ,
2024-07-03 08:33:55 +00:00
[ taos . TMQConstants . CONNECT _USER , "root" ] ,
[ taos . TMQConstants . CONNECT _PASS , "taosdata" ] ,
[ taos . TMQConstants . AUTO _OFFSET _RESET , "latest" ] ,
[ taos . TMQConstants . WS _URL , 'ws://localhost:6041' ] ,
[ taos . TMQConstants . ENABLE _AUTO _COMMIT , 'true' ] ,
[ taos . TMQConstants . AUTO _COMMIT _INTERVAL _MS , '1000' ]
] ) ;
2024-08-02 07:14:08 +00:00
try {
return await taos . tmqConnect ( configMap ) ;
} catch ( err ) {
2024-08-03 15:32:14 +00:00
console . log ( "Failed to create websocket consumer, ErrCode:" + err . code + "; ErrMessage: " + err . message ) ;
2024-08-02 07:14:08 +00:00
throw err ;
}
2024-07-03 08:33:55 +00:00
}
// ANCHOR_END: create_consumer
async function prepare ( ) {
let conf = new taos . WSConfig ( 'ws://localhost:6041' ) ;
2024-08-02 07:14:08 +00:00
conf . setUser ( 'root' ) ;
conf . setPwd ( 'taosdata' ) ;
conf . setDb ( 'power' ) ;
2024-07-03 08:33:55 +00:00
const createDB = ` CREATE DATABASE IF NOT EXISTS POWER ${ db } KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1; ` ;
const createStable = ` CREATE STABLE IF NOT EXISTS ${ db } . ${ stable } (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); ` ;
let wsSql = await taos . sqlConnect ( conf ) ;
await wsSql . exec ( createDB ) ;
await wsSql . exec ( createStable ) ;
2024-08-02 08:20:50 +00:00
let createTopic = ` CREATE TOPIC IF NOT EXISTS ${ topics [ 0 ] } AS SELECT * FROM ${ db } . ${ stable } ` ;
await wsSql . exec ( createTopic ) ;
2024-07-03 08:33:55 +00:00
for ( let i = 0 ; i < 10 ; i ++ ) {
await wsSql . exec ( ` INSERT INTO d1001 USING ${ stable } (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${ 10 + i } , ${ 200 + i } , ${ 0.32 + i } ) ` ) ;
}
wsSql . Close ( ) ;
}
async function subscribe ( consumer ) {
2024-08-02 08:20:50 +00:00
// ANCHOR: commit
try {
await consumer . subscribe ( [ 'topic_meters' ] ) ;
for ( let i = 0 ; i < 50 ; i ++ ) {
let res = await consumer . poll ( 100 ) ;
for ( let [ key , value ] of res ) {
console . log ( key , value ) ;
}
consumer . commit ( ) ;
}
} catch ( err ) {
2024-08-03 15:32:14 +00:00
console . error ( "Failed to poll data; err.code, ErrCode:" + err . code + "; ErrMessage: " + err . message ) ;
2024-08-02 08:20:50 +00:00
throw err ;
2024-07-03 08:33:55 +00:00
}
2024-08-02 08:20:50 +00:00
// ANCHOR_END: commit
2024-07-03 08:33:55 +00:00
}
async function test ( ) {
2024-08-02 08:20:50 +00:00
// ANCHOR: unsubscribe
2024-07-03 08:33:55 +00:00
let consumer = null ;
try {
await prepare ( ) ;
let consumer = await createConsumer ( )
2024-08-02 08:20:50 +00:00
await subscribe ( consumer )
2024-07-03 08:33:55 +00:00
await consumer . unsubscribe ( ) ;
}
catch ( err ) {
2024-08-03 15:32:14 +00:00
console . error ( "Failed to unsubscribe consume, ErrCode:" + err . code + "; ErrMessage: " + err . message ) ;
2024-07-03 08:33:55 +00:00
}
finally {
if ( consumer ) {
await consumer . close ( ) ;
}
taos . destroy ( ) ;
}
2024-08-02 08:20:50 +00:00
// ANCHOR_END: unsubscribe
2024-07-03 08:33:55 +00:00
}
test ( )