2024-09-25 09:03:25 +00:00
const { sleep } = require ( "@tdengine/websocket" ) ;
2024-07-03 08:33:55 +00:00
const taos = require ( "@tdengine/websocket" ) ;
2024-08-12 06:13:31 +00:00
// ANCHOR: create_consumer
2024-07-03 08:33:55 +00:00
const db = 'power' ;
const stable = 'meters' ;
2024-08-12 06:13:31 +00:00
const url = 'ws://localhost:6041' ;
2024-08-17 09:40:44 +00:00
const topic = 'topic_meters'
const topics = [ topic ] ;
const groupId = "group1" ;
const clientId = "client1" ;
2024-07-03 08:33:55 +00:00
async function createConsumer ( ) {
2024-08-12 06:13:31 +00:00
let groupId = "group1" ;
2024-08-17 09:40:44 +00:00
let clientId = "client1" ;
2024-07-03 08:33:55 +00:00
let configMap = new Map ( [
2024-08-12 06:13:31 +00:00
[ taos . TMQConstants . GROUP _ID , groupId ] ,
[ taos . TMQConstants . CLIENT _ID , clientId ] ,
2024-07-03 08:33:55 +00:00
[ taos . TMQConstants . CONNECT _USER , "root" ] ,
[ taos . TMQConstants . CONNECT _PASS , "taosdata" ] ,
[ taos . TMQConstants . AUTO _OFFSET _RESET , "latest" ] ,
2024-08-12 06:13:31 +00:00
[ taos . TMQConstants . WS _URL , url ] ,
2024-07-03 08:33:55 +00:00
[ taos . TMQConstants . ENABLE _AUTO _COMMIT , 'true' ] ,
[ taos . TMQConstants . AUTO _COMMIT _INTERVAL _MS , '1000' ]
] ) ;
2024-08-02 07:14:08 +00:00
try {
2024-08-12 06:13:31 +00:00
conn = await taos . tmqConnect ( configMap ) ;
console . log ( ` Create consumer successfully, host: ${ url } , groupId: ${ groupId } , clientId: ${ clientId } ` )
return conn ;
2024-08-14 08:25:22 +00:00
} catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( ` Failed to create websocket consumer, topic: ${ topic } , groupId: ${ groupId } , clientId: ${ clientId } , ErrCode: ${ err . code } , ErrMessage: ${ err . message } ` ) ;
2024-08-02 07:14:08 +00:00
throw err ;
}
2024-08-14 08:25:22 +00:00
2024-07-03 08:33:55 +00:00
}
// ANCHOR_END: create_consumer
async function prepare ( ) {
let conf = new taos . WSConfig ( 'ws://localhost:6041' ) ;
2024-08-02 07:14:08 +00:00
conf . setUser ( 'root' ) ;
conf . setPwd ( 'taosdata' ) ;
conf . setDb ( 'power' ) ;
2024-08-12 06:13:31 +00:00
const createDB = ` CREATE DATABASE IF NOT EXISTS ${ db } ` ;
2024-07-03 08:33:55 +00:00
const createStable = ` CREATE STABLE IF NOT EXISTS ${ db } . ${ stable } (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); ` ;
2024-08-14 08:25:22 +00:00
2024-07-03 08:33:55 +00:00
let wsSql = await taos . sqlConnect ( conf ) ;
await wsSql . exec ( createDB ) ;
await wsSql . exec ( createStable ) ;
2024-08-02 08:20:50 +00:00
let createTopic = ` CREATE TOPIC IF NOT EXISTS ${ topics [ 0 ] } AS SELECT * FROM ${ db } . ${ stable } ` ;
await wsSql . exec ( createTopic ) ;
2024-09-25 13:54:45 +00:00
await wsSql . close ( ) ;
2024-09-25 09:03:25 +00:00
}
2024-08-02 08:20:50 +00:00
2024-09-25 09:03:25 +00:00
async function insert ( ) {
let conf = new taos . WSConfig ( 'ws://localhost:6041' ) ;
conf . setUser ( 'root' ) ;
conf . setPwd ( 'taosdata' ) ;
conf . setDb ( 'power' ) ;
let wsSql = await taos . sqlConnect ( conf ) ;
for ( let i = 0 ; i < 50 ; i ++ ) {
2024-07-03 08:33:55 +00:00
await wsSql . exec ( ` INSERT INTO d1001 USING ${ stable } (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${ 10 + i } , ${ 200 + i } , ${ 0.32 + i } ) ` ) ;
2024-09-25 09:03:25 +00:00
await sleep ( 100 ) ;
2024-07-03 08:33:55 +00:00
}
2024-09-25 13:54:45 +00:00
await wsSql . close ( ) ;
2024-07-03 08:33:55 +00:00
}
async function subscribe ( consumer ) {
2024-08-02 08:20:50 +00:00
// ANCHOR: commit
try {
await consumer . subscribe ( [ 'topic_meters' ] ) ;
for ( let i = 0 ; i < 50 ; i ++ ) {
let res = await consumer . poll ( 100 ) ;
for ( let [ key , value ] of res ) {
2024-08-17 09:40:44 +00:00
// Add your data processing logic here
2024-08-12 06:13:31 +00:00
console . log ( ` data: ${ key } ${ value } ` ) ;
2024-08-02 08:20:50 +00:00
}
2024-08-15 05:37:06 +00:00
await consumer . commit ( ) ;
2024-08-12 12:28:31 +00:00
console . log ( "Commit offset manually successfully." ) ;
2024-08-14 08:25:22 +00:00
}
2024-08-02 08:20:50 +00:00
} catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( ` Failed to poll data, topic: ${ topic } , groupId: ${ groupId } , clientId: ${ clientId } , ErrCode: ${ err . code } , ErrMessage: ${ err . message } ` ) ;
2024-08-02 08:20:50 +00:00
throw err ;
2024-07-03 08:33:55 +00:00
}
2024-08-02 08:20:50 +00:00
// ANCHOR_END: commit
2024-07-03 08:33:55 +00:00
}
2024-09-26 02:52:17 +00:00
async function test ( ) {
2024-08-02 08:20:50 +00:00
// ANCHOR: unsubscribe
2024-07-03 08:33:55 +00:00
let consumer = null ;
try {
await prepare ( ) ;
2024-09-25 09:03:25 +00:00
consumer = await createConsumer ( ) ;
const allPromises = [ ] ;
allPromises . push ( subscribe ( consumer ) ) ;
allPromises . push ( insert ( ) ) ;
await Promise . all ( allPromises ) ;
2024-07-03 08:33:55 +00:00
await consumer . unsubscribe ( ) ;
2024-08-12 12:28:31 +00:00
console . log ( "Consumer unsubscribed successfully." ) ;
2024-07-03 08:33:55 +00:00
}
catch ( err ) {
2024-08-17 09:40:44 +00:00
console . error ( ` Failed to unsubscribe consumer, topic: ${ topic } , groupId: ${ groupId } , clientId: ${ clientId } , ErrCode: ${ err . code } , ErrMessage: ${ err . message } ` ) ;
2024-09-25 07:29:34 +00:00
throw err ;
2024-07-03 08:33:55 +00:00
}
finally {
if ( consumer ) {
await consumer . close ( ) ;
2024-08-14 08:25:22 +00:00
console . log ( "Consumer closed successfully." ) ;
2024-07-03 08:33:55 +00:00
}
taos . destroy ( ) ;
}
2024-08-02 08:20:50 +00:00
// ANCHOR_END: unsubscribe
2024-07-03 08:33:55 +00:00
}
test ( )