TDengine/source/libs/executor/src/timewindowoperator.c
Pan Wei f0df7a26d7
Feat/ts 6100 3.0.0722m (#32103)
* migrate system-test/2-query

* revert file

* update case.task

* resolve script migrate

* run new test framework on new_testcases

* migrate system-test/2-query

* format docstring

* fix test validation

* fix test validation

* fix error

* migrate army case

* migrate army case

* fix error

* migrate system-test/2-query

* migrate system-test/2-query

* migrate system-test/2-query

* migrate system-test/2-query

* test exe time

* fix ci error

* migrate system-test/1-insert

* new common function

* migrate system-test/1-insert

* fix ci error

* migrate system-test/1-insert

* feat: add configuration and script for memory allocator settings

* fix: correct HEAPPROFILE path and remove redundant metadata_thp setting in memory allocator script

* fix ci error

* migrate system-test/1-insert, 2-query

* feat:insert into subquery (#31401) (#31710)

* migrate system-test/99-TDcase

* feat(gpt): add grant check for gpt. (#31708)

* migrate system-test/99-TDcase

* migrate system-test/99-TDcase

* migrate system-test/99-TDcase

* fix/send-heartbeat-statis (#31680)

* migrate system-test/7-tmq

* chore: support cmake option TAOSWS_GIT_TAG like taosadapter [skip ci] (#31486)

* add system-test/6/cluster test

* chore: move default branch from main to 3.3.6 for adapter/taosws

* fix docstring validation

* migrate system-test/7-tmq

* migrate system-test/7-tmq

* feat: add set_taos_malloc_env configuration and update related scripts

* Update 03-kubernetes.md

* enh: add log for snapshot (#31681)

* simple test

* more

* migrate system-test/7-tmq

* migrate system-test 0-others cases

* migrate system-test/7-tmq

* fix docstring validation

* migrate system-test/7-tmq

* fix docstring validation

* fix: invalid queue

* fix docstring validation

* refactor: reorganize memory allocator script constants and improve mode descriptions

* enh: TD-36324-improve-sync-heartbeat-log (#31727)

* recover log level

* fix: taosd crush in query when insufficient memory (#31746)

Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* fix: overflow check in snprintf (#31780)

* migrate system-test/7-tmq

* migrate some cases

* migrate system-test/7-tmq

* fix failed cases

* fix failed cases

* migrate some cases

* fix failed cases

* fix failed cases

* migrate testcases

* fix: update environment file path in taosd.service and adjust set_taos_malloc.sh configuration

* fix: set lcn before do s3 migrate. (#31782)

* migrate testcases

* fix failed cases

* fix: add condition to set default malloc config for taosd service

* fix: update usage message in set_taos_malloc.sh to include quiet mode option

* fix: add quiet mode option to set malloc config in install script

* fix: update set_taos_malloc.sh to improve output messages and adjust default malloc config invocation

* recover log level

* fix(tmq): [TS-6569]tdb error if write tmq meta data in multi thread (#31808)

* fix: the calculation of dnode uptime (#31832)

* migrate testcase

* fix: correct timediff function bug and redress docs (#31798)

* fix: enhance malloc configuration for taosd and taosadapter in install script

* fix: TD-36442 show full condition (#31796)

* enh/TD-36466-sync-heartbeat (#31805)

* fix(plan) virtual table support BI moudle when use in select (#31787)

* fix failed cases

* enh: TS-5926-force-repair-wal (#31828)

* migrate testcase

* rename taos & taosd (#31855)

* migrate testcase

* migrate testcase

* fix failed cases

* fix: update environment file paths in taosd.service and set_taos_malloc.sh

* feat: add performance tuning documentation for memory optimization and set_taos_malloc.sh usage

* fix: update file paths in performance tuning documentation for consistency

* feat: add performance tuning documentation for memory allocator configuration script

* fix: add missing line breaks for improved readability in performance tuning documentation

* fix: adjust heading levels for consistency in performance tuning documentation

* fix cases

* fix new_testcases

* fix failed cases

* fix(query): support show tags on virtual table (#31831)

* fix failed cases

* docs: replace mysql screenshot (#31888)

* feat: use the new TDengine product name (#31859)

* merge 3.0

* fix failed cases

* fix failed cases

* merge 3.0

* fix cases

* fix cases

* doc: Update 03-stream.md (#31675)

* chore(deps): bump requests from 2.27.1 to 2.32.4 in /test (#31326)

Bumps [requests](https://github.com/psf/requests) from 2.27.1 to 2.32.4.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.27.1...v2.32.4)

---
updated-dependencies:
- dependency-name: requests
  dependency-version: 2.32.4
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix new cases

* fix:Convert line endings from LF to CRLF for ans file

* build(deps): bump golang.org/x/net in /tools/keeper (#30811)

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.36.0 to 0.38.0.
- [Commits](https://github.com/golang/net/compare/v0.36.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump urllib3 from 1.26.20 to 2.5.0 in /test (#31414)

Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.20 to 2.5.0.
- [Release notes](https://github.com/urllib3/urllib3/releases)
- [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst)
- [Commits](https://github.com/urllib3/urllib3/compare/1.26.20...2.5.0)

---
updated-dependencies:
- dependency-name: urllib3
  dependency-version: 2.5.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.tomcat.embed:tomcat-embed-core (#31392)

Bumps org.apache.tomcat.embed:tomcat-embed-core from 9.0.104 to 9.0.106.

---
updated-dependencies:
- dependency-name: org.apache.tomcat.embed:tomcat-embed-core
  dependency-version: 9.0.106
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.tomcat.embed:tomcat-embed-core (#31393)

Bumps org.apache.tomcat.embed:tomcat-embed-core from 9.0.104 to 9.0.106.

---
updated-dependencies:
- dependency-name: org.apache.tomcat.embed:tomcat-embed-core
  dependency-version: 9.0.106
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.kafka:kafka-clients (#31341)

Bumps org.apache.kafka:kafka-clients from 3.9.0 to 3.9.1.

---
updated-dependencies:
- dependency-name: org.apache.kafka:kafka-clients
  dependency-version: 3.9.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: add lock when calculating log buffer start/end (#31883)

* fix new cases

* fix new cases

* fix failed cases

* feat: new stream (#31678)

* fix: windows compile issue

* test: add vtable cases (#31829)

* fix: windows compile issues

* test:add test cases

* fix: windows compile issue

* case: em-4 stream case submit

* test: stream4_sub1 found bug2

* test: submit test_scene_meters_bug2.py

* add stream parameters example

* feat: [TS-6100] Do not translate const value as column.

* Feat/ts 6100 3.0 zlv (#31747)

* modify asan exampel

* modify asan exampel

* add example

* add example

* modify case example

---------

Co-authored-by: zelv01 <1101510017@qq.com>

* feat(stream): fix memory leak

* modify sliding example

* test: update test case.

* feat(stream): fix conflicts

* fix: add offset case 10a 10s 10m 10h 10d

* feat(stream): fix conflicts

* chore(stream): rename case name #TS-6100

* add case

* modify example

* fix: windows compile issues

* fix: data null check

* feat: [TS-6100] Forbid where when using %%trows (#31827)

* feat: [TS-6100] Forbid where when using %%trows

* test: update cases

* feat: [TS-6100] Fix leaks.

---------

Co-authored-by: Simon Guan <guanshengliang@qq.com>

* test: reproduce bugs

* test: update test case.

* test: update test case.

* feat: [TS-6100] Fix leaks.

* test: add cases

* Feat/ts 6100 3.0.pw10 (#31841)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* test: reproduce bugs

* fix: add sliding interval combine case

* test: add cases

* test: add recalc test.

* test: reproduce bugs

* case : add vt ts is null check

* modify case

* bug: submit test_idmp_meters_bug3.py

* test: add test for recalc.

* test: add cases

* fix: error code check

* test: add cases

* fix(stream): scan wal with schema in that version

* add case

* test: add cases

* test: update test case.

* fix: windows compile issues

* add case

* test: add cases (#31845)

* modify case

* fix: reset interpPrev

* test: add test_idmp_meters bug4 and bug3

* add case

* fix(stream): opti wal interface

* fix: remove test_idmp_meters_bug5.py

* test: add cases

* fix(stream): fix ts data fetch for virtual tables

* cancel asan case

* test: update test case.

* test: update test case.

* add case

* test: add cases

* test: add cases

* test: add case test_idmp_meters_bug5.py

* test: update test case.

* fix(stream): tmq error

* test: add cases

* feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use.

* fix(stream): optimize val scan logic

* test: add test_recalc_expired_time.py  to ci.

* test: update test case.

* test: update test case.

* feat: [TS-6100] Fix fill range check

* fix(stream): optimize val scan logic

* add case

* test: modify for partition by %%1

* test: add fun case stream4_sub7

* fix(stream): optimize val scan logic

* add case

* feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS.

* test: add test for recalc.

* test: use stream_options.

* fix: some cases error.

* test: remove recalc from ci.

* fix: ci case issues (#31880)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

* fix: external window idx issue

* fix: ci issues

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* fix(stream): fix compilation error

* fix(stream): optimize val scan logic

* test:add test cases

* test: modify case

* fix: external agg error

* test(stream): tobacco scene testing #TD-36514

* test: add stream cases (#31885)

* fix: windows compile issue

* fix: calc timerange

* fix: windows compile issue

* modify case

* fix(stream): compile error

* test: remove one debug test case file

* test: modify

* test: add test cases

* test: reproduce bugs

* test: reproduce bugs

* feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868)

* feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM.

* test: update case

---------

Co-authored-by: Simon Guan <guanshengliang@qq.com>

* add example

* add example

* modify case example

* modify case

* test:alter sql

* test: add stream5 case

* fix(stream): get schema error with version

* test: add delete recalc test py.

* test: remove bug cases

* test: stream5 case test passed

* test: add state cases (#31893)

* fix(stream): compile error

* test: modify case

* test: add cases

* test: add test.

* test: update test case.

* chore(test): fix case err

* test: update test case.

* fix: align data get

* fix(stream): fix row index of datablock written into data cache

* fix: put align data

* test: update test case.

* test: add test cases for virtual table

* chore(test): fix case err #TD-36514

* add case

* test: add test for water mark.

* test: add meters bug6 for stream5

* test: add cases (#31903)

* test: add test for recalc.

* feat: [TS-6100] %%trows can only be used when event type is window close.

* test: add precision of database for ms/us/ns

* modify case

* add case

* add case

* test: add test to ci.

* modify case

* fix: ci case issues (#31904)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

* fix: external window idx issue

* fix: ci issues

* fix: ci case issues

* fix: drop dnode issue

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* fix(stream): ci error

* test: update test case.

* feat: [TS-6100] Disable some failed UT.

* feat: [TS-6100] Fix virtual table

* test: add bug 5.

* test: add test delete recalc to ci.

* test: add bug 6.

* test(stream): tobacco scene #TD-36514

* fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext

Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* test: add case stream6

* fix(stream): implement some pending features in trigger task

* modify case

* modify case

* fix: case issues

* modify case

* test: add recalc for warter mark.

* fix(stream): fix count window trigger of virtual tables

* fix(stream): memory leak

* test: fix run err.

* test: add stream6 bug7

* fix: adjust format

* test(stream): tobacco scene testing #TD-36514

* test: change bug7 with update window1 and 2

* test: add test bug 7.

* case: restore write 3 window

* fix: windows compile issue

* fix: notify

* test: add cases

* modify case

* test: update test case.

* test(stream): toobacco scene testing #TD-36514

---------

Co-authored-by: Simon Guan <slguan@taosdata.com>
Co-authored-by: plum-lihui <huili@taosdata.com>
Co-authored-by: Alex Duan <417921451@qq.com>
Co-authored-by: zelv01 <1101510017@qq.com>
Co-authored-by: Jing Sima <simondominic9997@outlook.com>
Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com>
Co-authored-by: wangmm0220 <wangmm0220@gmail.com>
Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: zyyang90 <zyyang@taosdata.com>
Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com>
Co-authored-by: facetosea <285808407@qq.com>
Co-authored-by: Simon Guan <guanshengliang@qq.com>
Co-authored-by: huohong <sallyhuo@taosdata.com>
Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: xiao-77 <berylbao@taosdata.com>
Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com>
Co-authored-by: happyguoxy <happy_guoxy@163.com>
Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com>
Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* test: rename TSDB

* docs: fix rust examples (#31908)

* docs: modify rust native test case

* docs: modify rust ws test case

* docs: modify rust examples

* docs: update rust pool docs

* fix new cases

* migrate test case

* feat: support reading sub table names and tag values from CSV files to create sub tables (#31909)

* feat: add obtaining table names from tag files

* feat: add write data table control

* feat: add table params to write

* feat: delete log file

* feat: modify test case csv path

* feat: resolve memory leakage in the table building thread

* feat: resolve compilation errors

* feat: resolve table name copy len error

* feat: modify create table log level

* feat: modifying query configuration parameter array out of bounds

* feat: support custom primary key names

* feat: modify log level

* feat: add set primary key name case

* feat: add column  keywords case

* feat: add keywords case data

* feat: modify primaryKeyName value len

* feat: modify primaryKeyName value define

* feat: modify primaryKeyName value size

* fix: compile issue (#31943)

Co-authored-by: taos-support <it@taosdata.com>

* package: fix error

* package: fix error

* fix failed cases

* merge 3.0

* rename create_table_keywords.py to test_create_table_keywords.py

* fix failed cases

* fix new cases

* docs: update stream (#31957)

* docs: update jdbc out-dated descripiton (#31959)

* fix: TD-36560 refactor arbitrator group function name and log (#31852)

* feat: support BLOB data type (#31704)

* rename 0-others/mounts.py to 0-others/test_mounts.py

* fix failed cases

* docs: update gpt (#31975)

* fix failed cases

* fix failed cases

* package: fix error

* feat: add taosBenchmark command line parameters (#31967)

* feat: add command line parameters

* feat: add command line parameter test cases

* fix: tableName len error

* enh: set TD Release build in tdengine-build.yml

* Update tdengine-build.yml

* fix: add json file path log

* fix: streamline TD_CONFIG export in build steps

* fix: Restore the build configuration

---------

Co-authored-by: haoranchen <haoran920c@163.com>

* fix: tableName len error (#31977)

* fix: tableName len error

* fix: modify TD_CONFIG=Release

* fix: code format

* fix: Restore the build configuration

* enh: set TD Release build in tdengine-build.yml (#31980)

* enh: set TD Release build in tdengine-build.yml

* Update tdengine-build.yml

* fix: update cache key for externals to include debug build version

* fix: remove verbose flag from build commands in tdengine-build.yml

* skip memleak cases

* fix failed case

* fix failed cases

* package: fix error

* fix(stmt2):tbname error output (#31997)

* fix: possible memory leak (#31972)

* feat: create connect add dbname params (#32002)

* feat: create connect add dbname params

* fix: connect param error

* skip failed cases

* fix cases on windows

* fix cases

* support connect bi mode and fix log level

* unique sql connect username and password

* fix log level

* enh: mounted vnode may have no tq (#31916)

* fix: subquery memleak (#32024)

* fix failed case

* fix cases

* rename 2-query/test_insert_select.py to 2-query/test_system_insert_select.py

* skip memleak cases

* enh: rename data forecast/detect to forecasting/anomaly detection (#32021)

* package: unique product name

* package: update for main

* skip tsim cases

* chore: update jdbc connection pool validation query sql (#32056)

* refactor: quotes usage in bash scripts

Signed-off-by: WANG Xu <feici02@outlook.com>

* enhn(mqtt/rawblock): new format for msg payload (#31801)

* fix: fix broken link in 14-stream.md

* docs: 15-spark.md is missing end semicolon (#32068)

* chore: bump dev version to 3.3.7.0.alpha (#32066)

* fix: blob test (#32020)

* fix blob query error

* fix blob query error

* fix blob query error

* fix blob query error

* fix blob query error

* opt query

* opt write

* opt write

* opt write

* opt bse

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* add cache

* opt query

* opt query

* opt bse

* add data iter

* add data iter

* add more compress

* add more compress

* add more compress

* add more compress

* add more compress

* add more compress

* opt blob transfer

* opt blob transfer

* opt blob transfer

* opt write

* avoid unordered data write

* avoid unordered data write

* opt read

* refactor log

* fix invalid write

* refactor code

* fix merge error

* fix merge error

* add error code

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* Merge remote-tracking branch 'origin/3.0' into enh/blob

* refactor code

* support blob type len

* refactor code

* refactor code

* benchmark support blob type

* benchmark support blob type

* add log

* handle sort and merge row

* change file set

* change file set

* change file set

* change file set

* change file set

* change file set

* change file set

* opt code

* opt read

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* refactor code

* add restart error and add unit test

* opt code

* refactor code

* fix invalid write

* blob test

* blob test

* blob test

* blob test

* blob test

* support blob

* add str trim

* add str trim

* Merge remote-tracking branch 'origin/3.0' into feat/blob

* Merge remote-tracking branch 'origin/3.0' into feat/blob

* update test case

* fix invalid read

* fix invalid read

* fix invalid read

* add stmt2

* add stmt2

* add stmt2

* update parameter

* refactor test case

* refactor test case

* support blob

* support stmt2

* add sub

* support blob

* support blob

* support sub

* fix tmq crash

* support windows/darwin

* fix stmt2 bind row

* fix blob crash

* fix blob crash

* fix blob query error

* fix blob crash

* fix merge error

* refactor bse

* add blob transfer

* add blob transfer

* add transfer snapshot

* refactor code

* change log level

* change log level

* add test case

* refactor code

* revert taosBenchmark

* revert taosbench

* fix: improve error handling and encoding for file reading in grep_asserts_in_file function

* rm assert

* fix mem leak

* fix compile error

* fix conflict

* fix conflict

* fix compile error

* fix compile error

* fix pre check error

* fix pre check error

* fix compile error on windows

* fix error on window

* fix error on windows

* fix error on windows

* opt no-blob sql

* fix compile error on dawain

* fix compile error on dawain

* fix invalid read

* fix mem leak

* fix invalid read

* fix invalid read

* fix error on windows

* remove unused code

* refactor code

* fix mem leak

* fix mem leak

* rm unused code

* rm unused code

* fix invalid copy

* fix invalid copy

* fix invalid copy

* fix invalid copy

* fix invalid copy

* refactor code

* make ci happy

* refactor code

* change make

* update test case

* update ignore code

* update bse snapshot and test case

* update bse snapshot and test case

* refactor code

* change unit test

* update bse snapshot

* update bse snapshot

* fix test case

* fix bse snapshot

* fix bse snapshot

* fix snapshot transfer

* remove unused log

* support func query

* add test case

* forbidden  unsupport code

* merge 3.0

* merge 3.0

* change test case

* add forbidden code

* add forbidden code

* add code

* support length func

* support length func

* taosBenchmark support  blob

* support blob raw block

* support write raw block

* support more query

* Merge branch 'feat/blob' into feat/blob_test

* SBlobRow2

* change bse commit change

* refactor code

* rm exe test

* refactor code

* checke return code

* rename blob name

* refactor code

* refactor code

* refactor code

* refactor code

* fix unordere write

* fix unordere write

* fix row merge error

* fix row merge error

* fix row merge error

* fix row merge error

* support ordered data

* support ordered data

* Merge remote-tracking branch 'origin/3.0' into feat/blob_test

* fix stmt2 blob crash

* add not support write type

* add not support write type

* add not support write type

* rm exe

* fix col-formate error

* fix col-formate error

* fix mem leak

* refactor code

* add error code for single blob column restriction and update error message

* add error code for single blob column restriction and update error message

* add error code for single blob column restriction and update error message

* refactor code

* change error code

* make ci happy

---------

Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
Co-authored-by: chenhaoran <haoran920c@163.com>

* package: update for main (#32091)

* fix compile error on windows (#32089)

* fix: source code merge issues

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: WANG Xu <feici02@outlook.com>
Co-authored-by: minhuinie <nminhui@163.com>
Co-authored-by: huohong <sallyhuo@taosdata.com>
Co-authored-by: chenhaoran <haoran920c@163.com>
Co-authored-by: Nie Minhui <143420805+minhuinie@users.noreply.github.com>
Co-authored-by: Mario Peng <48949600+Pengrongkun@users.noreply.github.com>
Co-authored-by: Haojun Liao <hjxilinx@users.noreply.github.com>
Co-authored-by: dongming chen <cademfly@hotmail.com>
Co-authored-by: Linhe Huo <linhehuo@gmail.com>
Co-authored-by: huohong <346479823@qq.com>
Co-authored-by: Joel Brass <joel@jbrass.com>
Co-authored-by: WANG Xu <feici02@outlook.com>
Co-authored-by: Hongze Cheng <hzcheng@taosdata.com>
Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com>
Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
Co-authored-by: Kaili Xu <klxu@taosdata.com>
Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com>
Co-authored-by: WANG MINGMING <wangmm0220@gmail.com>
Co-authored-by: hongzhenliu <wluckyjob@gmail.com>
Co-authored-by: Daniel Clow <106956386+danielclow@users.noreply.github.com>
Co-authored-by: She Yanjie <57549981+sheyanjie-qq@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Simon Guan <slguan@taosdata.com>
Co-authored-by: plum-lihui <huili@taosdata.com>
Co-authored-by: Alex Duan <417921451@qq.com>
Co-authored-by: zelv01 <1101510017@qq.com>
Co-authored-by: Jing Sima <simondominic9997@outlook.com>
Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com>
Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: zyyang90 <zyyang@taosdata.com>
Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com>
Co-authored-by: facetosea <285808407@qq.com>
Co-authored-by: Simon Guan <guanshengliang@qq.com>
Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: xiao-77 <berylbao@taosdata.com>
Co-authored-by: happyguoxy <happy_guoxy@163.com>
Co-authored-by: guozhenwei <2227465945@qq.com>
Co-authored-by: kevin men <men_shi_bin@163.com>
Co-authored-by: taos-support <it@taosdata.com>
Co-authored-by: Yihao Deng <luomoxyz@126.com>
Co-authored-by: Minglei Jin <49711132+stephenkgu@users.noreply.github.com>
Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-07-22 13:25:21 +08:00

2716 lines
100 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "tglobal.h"
#include "tlog.h"
#include "ttime.h"
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
} SOpenWindowInfo;
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
uint64_t groupId, SExecTaskInfo* pTaskInfo);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo) {
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
if (pResultRow == NULL || pTaskInfo->code != 0) {
*pResult = NULL;
qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
return pTaskInfo->code;
}
// set time window for current result
TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
*pResult = pResultRow;
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->win.ekey = ts;
pRowSup->prevTs = ts;
pRowSup->numOfRows += 1;
pRowSup->groupId = groupId;
}
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex];
pRowSup->groupId = groupId;
}
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t order, int64_t* pData) {
int32_t forwardRows = 0;
if (order == TSDB_ORDER_ASC) {
int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) {
forwardRows = end;
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
} else {
int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) {
forwardRows = end;
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
// int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
// if (end >= 0) {
// forwardRows = pos - end;
//
// if (pData[end] == ekey) {
// forwardRows += 1;
// }
// }
}
return forwardRows;
}
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t midPos = -1;
int32_t numOfRows;
if (num <= 0) {
return -1;
}
TSKEY* keyList = (TSKEY*)pValue;
int32_t firstPos = 0;
int32_t lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key < keyList[lastPos]) {
lastPos += 1;
if (lastPos >= num) {
return -1;
} else {
return lastPos;
}
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
firstPos = midPos + 1;
} else if (key > keyList[midPos]) {
lastPos = midPos - 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1u) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
int32_t num = -1;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
if (order == TSDB_ORDER_ASC) {
if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
if (item != NULL) {
item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
}
} else {
num = pDataBlockInfo->rows - startPos;
if (item != NULL) {
item->lastKey = pDataBlockInfo->window.ekey + step;
}
}
} else { // desc
if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
if (item != NULL) {
item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
}
} else {
num = pDataBlockInfo->rows - startPos;
if (item != NULL) {
item->lastKey = pDataBlockInfo->window.ekey + step;
}
}
}
return num;
}
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t index = 1;
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
pCtx[k].start.key = INT64_MIN;
continue;
}
SFunctParam* pParam = &pCtx[k].param[0];
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
SGroupKeys* p = taosArrayGet(pPrevValues, index);
GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
typeGetTypeModFromColInfo(&pColInfo->info));
}
GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
typeGetTypeModFromColInfo(&pColInfo->info));
#if 0
if (functionId == FUNCTION_INTERP) {
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = prevTs;
pCtx[k].start.val = v1;
pCtx[k].end.key = curTs;
pCtx[k].end.val = v2;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
if (prevRowIndex == -1) {
// pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
} else {
pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
}
pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
}
}
} else if (functionId == FUNCTION_TWA) {
#endif
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v};
if (!fmIsElapsedFunc(pCtx[k].functionId)) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
}
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = point.key;
pCtx[k].start.val = v;
} else {
pCtx[k].end.key = point.key;
pCtx[k].end.val = v;
}
index += 1;
}
#if 0
}
#endif
}
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
if (type == RESULT_ROW_START_INTERP) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].start.key = INT64_MIN;
}
} else {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].end.key = INT64_MIN;
}
}
}
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
TSKEY curTs = tsCols[pos];
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
TSKEY lastTs = *(int64_t*)pTsKey->pData;
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
// start exactly from this point, no need to do interpolation
TSKEY key = ascQuery ? win->skey : win->ekey;
if (key == curTs) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
return true;
}
// it is the first time window, no need to do interpolation
if (pTsKey->isNull && pos == 0) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
} else {
TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
RESULT_ROW_START_INTERP, pSup);
}
return true;
}
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
TSKEY blockEkey, STimeWindow* win, bool* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t order = pInfo->binfo.inputTsOrder;
TSKEY actualEndKey = tsCols[endRowIndex];
TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
// not ended in current data block, do not invoke interpolation
if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
(*pRes) = false;
return code;
}
// there is actual end point of current time window, no interpolation needs
if (key == actualEndKey) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
(*pRes) = true;
return code;
}
if (nextRowIndex < 0) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
TSKEY nextKey = tsCols[nextRowIndex];
doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
RESULT_ROW_END_INTERP, pSup);
(*pRes) = true;
return code;
}
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
if (pInterval->interval != pInterval->sliding &&
((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
return false;
}
return true;
}
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
}
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
bool ascQuery = (order == TSDB_ORDER_ASC);
int32_t precision = pInterval->precision;
getNextTimeWindow(pInterval, pNext, order);
// next time window is not in current block
if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
(pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
return -1;
}
if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
return -1;
}
TSKEY skey = ascQuery ? pNext->skey : pNext->ekey;
int32_t startPos = 0;
// tumbling time window query, a special case of sliding time window query
if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
startPos = prevPosition + 1;
} else {
if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
startPos = 0;
} else {
startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
}
}
/* interp query with fill should not skip time window */
// if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
// return startPos;
// }
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if (primaryKeys != NULL) {
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, pInterval);
pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
} else {
pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
pNext->skey = pNext->ekey - pInterval->interval + 1;
}
} else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos];
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, pInterval);
pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
} else {
pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
pNext->ekey = pNext->skey + pInterval->interval - 1;
}
}
}
return startPos;
}
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
if (type == RESULT_ROW_START_INTERP) {
return pResult->startInterp == true;
} else {
return pResult->endInterp == true;
}
}
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
if (type == RESULT_ROW_START_INTERP) {
pResult->startInterp = true;
} else {
pResult->endInterp = true;
}
}
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo->timeWindowInterpo) {
return code;
}
if (pBlock == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
}
if (pBlock->pDataBlock == NULL) {
return code;
}
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
bool done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) { // it is not interpolated, now start to generated the interpolated value
bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
}
// point interpolation does not require the end key time window interpolation.
// interpolation query does not generate the time window end interpolation
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + forwardRows - 1;
int32_t nextRowIndex = endRowIndex + 1;
// duplicated ts row does not involve in the interpolation of end value for current time window
int32_t x = endRowIndex;
while (x > 0) {
if (tsCols[x] == tsCols[x - 1]) {
x -= 1;
} else {
endRowIndex = x;
break;
}
}
TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = false;
code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
win, &interp);
QUERY_CHECK_CODE(code, lino, _end);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
if (pBlock->pDataBlock == NULL) {
return;
}
size_t num = taosArrayGetSize(pPrevKeys);
for (int32_t k = 0; k < num; ++k) {
SColumn* pc = taosArrayGet(pCols, k);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
if (colDataIsNull_s(pColInfo, i)) {
continue;
}
char* val = colDataGetData(pColInfo, i);
if (IS_VAR_DATA_TYPE(pkey->type)) {
if (IS_STR_DATA_BLOB(pkey->type)) {
memcpy(pkey->pData, val, blobDataTLen(val));
} else {
memcpy(pkey->pData, val, varDataTLen(val));
}
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
break;
}
}
}
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
SResultRow* pResult = NULL;
while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo*)pn->data;
uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) {
break;
}
SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
if (NULL == pr) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (pr->closed) {
if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
taosMemoryFree(pNode);
continue;
}
STimeWindow w = pr->win;
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
if (!pTsKey) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
int64_t prevTs = *(int64_t*)pTsKey->pData;
if (groupId == pBlock->info.id.groupId) {
TSKEY curTs = pBlock->info.window.skey;
if (tsCols != NULL) {
curTs = tsCols[startPos];
}
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
pBlock->info.rows, numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr);
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
taosMemoryFree(pNode);
} else { // the remains are can not be closed yet.
break;
}
}
}
static bool tsKeyCompFn(void* l, void* r, void* param) {
TSKEY* lTS = (TSKEY*)l;
TSKEY* rTS = (TSKEY*)r;
SIntervalAggOperatorInfo* pInfo = param;
return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
}
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
}
/**
* @brief check if cur window should be filtered out by limit info
* @retval true if should be filtered out
* @retval false if not filtering out
* @note If no limit info, we skip filtering.
* If input/output ts order mismatch, we skip filtering too.
* eg. input ts order: desc, and output ts order: asc, limit: 10
* IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
* every tuple in every block.
* And the boundedQueue keeps refreshing all records with smaller ts key.
*/
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|| pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
// if input/output ts order mismatch, no filter
) {
return false;
}
if (pOperatorInfo->limit == 0) return true;
if (pOperatorInfo->pBQ == NULL) {
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
}
bool shouldFilter = false;
// if BQ has been full, compare it with top of BQ
if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
}
if (shouldFilter) {
return true;
} else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
return false;
}
// cur win not been filtered out and not been pushed into BQ yet, push it into BQ
PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
*((TSKEY*)node.data) = win->skey;
if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
taosMemoryFree(node.data);
return true;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return false;
}
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
uint64_t tableGroupId = pBlock->info.id.groupId;
bool ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
if (tableGroupId != pInfo->curGroupId) {
pInfo->handledGroupNum += 1;
if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
return true;
} else {
pInfo->curGroupId = tableGroupId;
destroyBoundedQueue(pInfo->pBQ);
pInfo->pBQ = NULL;
}
}
STimeWindow win =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
pInfo->binfo.inputTsOrder);
// prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
// window start key interpolation
ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
// qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
// win.skey, win.ekey, startPos, forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, pInfo, pResult);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
pInfo->binfo.inputTsOrder);
if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, code);
}
// qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
// nextWin.skey, nextWin.ekey, startPos, forwardRows);
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
pInfo->binfo.inputTsOrder);
// window start(end) key interpolation
code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
(!ascScan && ekey >= pBlock->info.window.skey)) {
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
} else if (pInfo->timeWindowInterpo) {
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, pInfo, pResult);
}
if (pInfo->timeWindowInterpo) {
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
}
return false;
}
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
// current result is done in computing final results.
if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pResult);
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
taosMemoryFree(pNode);
}
}
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SOpenWindowInfo openWin = {0};
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) {
code = tdListAppend(pResultRowInfo->openWindow, &openWin);
QUERY_CHECK_CODE(code, lino, _end);
return openWin.pos;
}
SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
code = tdListAppend(pResultRowInfo->openWindow, &openWin);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return openWin.pos;
}
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
TSKEY* tsCols = NULL;
if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
if (!pColDataInfo) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols[0] == 0) {
qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
tsCols[pBlock->info.rows - 1]);
}
if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
}
return tsCols;
}
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
pInfo->cleanGroupResInfo = false;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
GET_STM_RTINFO(pOperator->pTaskInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
// the pDataBlock are always the same one, no need to call this again
code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
}
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
if (!pStateColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
int64_t gid = pBlock->info.id.groupId;
bool hasResult = false;
bool masterScan = true;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
int32_t bytes = pStateColInfoData->info.bytes;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
if (!pColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
continue;
}
hasResult = true;
if (pStateColInfoData->pData == NULL) {
qError("%s:%d state column data is null", __FILE__, __LINE__);
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
char* val = colDataGetData(pStateColInfoData, j);
if (gid != pRowSup->groupId || !pInfo->hasKey) {
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
blobDataCopy(pInfo->stateKey.pData, val);
} else {
varDataCopy(pInfo->stateKey.pData, val);
}
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}
pInfo->hasKey = true;
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
} else if (compareVal(val, &pInfo->stateKey)) {
doKeepTuple(pRowSup, tsList[j], gid);
} else { // a new state window started
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, ret);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
blobDataCopy(pInfo->stateKey.pData, val);
} else {
varDataCopy(pInfo->stateKey.pData, val);
}
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}
}
}
if (!hasResult) {
return;
}
SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, ret);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = pInfo->binfo.inputTsOrder;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
pInfo->cleanGroupResInfo = false;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
QUERY_CHECK_CODE(code, lino, _end);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code =
projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
doStateWindowAggImpl(pOperator, pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
pOperator->status = OP_RES_TO_RETURN;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
setOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
SSDataBlock* pBlock = pInfo->binfo.pRes;
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
setOperatorCompleted(pOperator);
break;
}
if (pBlock->info.rows > 0) {
break;
}
}
size_t rows = pBlock->info.rows;
pOperator->resultInfo.totalRows += rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (rows == 0) ? NULL : pBlock;
return code;
}
static void destroyStateWindowOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
if (pInfo->pOperator) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
cleanupExprSupp(&pInfo->scalarSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
static void freeItem(void* param) {
SGroupKeys* pKey = (SGroupKeys*)param;
taosMemoryFree(pKey->pData);
}
void destroyIntervalOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL;
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL;
cleanupGroupResInfo(&pInfo->groupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyBoundedQueue(pInfo->pBQ);
taosMemoryFreeClear(param);
}
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* tmp = NULL;
pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
{ // ts column
SColumn c = {0};
c.colId = 1;
c.slotId = pInfo->primaryTsIndex;
c.type = TSDB_DATA_TYPE_TIMESTAMP;
c.bytes = sizeof(int64_t);
tmp = taosArrayPush(pInfo->pInterpCols, &c);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
SGroupKeys key;
key.bytes = c.bytes;
key.type = c.type;
key.isNull = true; // to denote no value is assigned yet
key.pData = taosMemoryCalloc(1, c.bytes);
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
tmp = taosArrayPush(pInfo->pPrevValues, &key);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
bool* pRes) {
// the primary timestamp column
bool needed = false;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* tmp = NULL;
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
needed = true;
break;
}
}
if (needed) {
code = initWindowInterpPrevVal(pInfo);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol;
tmp = taosArrayPush(pInfo->pInterpCols, &c);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
SGroupKeys key = {0};
key.bytes = c.bytes;
key.type = c.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, c.bytes);
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
tmp = taosArrayPush(pInfo->pPrevValues, &key);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
*pRes = needed;
return code;
}
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
pOper->status = OP_NOT_OPENED;
resetBasicOperatorState(&pIntervalInfo->binfo);
cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
pIntervalInfo->cleanGroupResInfo);
colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code == 0) {
code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
}
if (code == 0) {
code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
&pTaskInfo->storageAPI.functionStore);
}
if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
}
if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
pIntervalInfo->curGroupId = UINT64_MAX;
}
pIntervalInfo->cleanGroupResInfo = false;
pIntervalInfo->handledGroupNum = 0;
pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
taosArrayDestroy(pIntervalInfo->pInterpCols);
pIntervalInfo->pInterpCols = NULL;
if (pIntervalInfo->pPrevValues != NULL) {
taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
pIntervalInfo->pPrevValues = NULL;
initWindowInterpPrevVal(pIntervalInfo);
}
cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
destroyBoundedQueue(pIntervalInfo->pBQ);
pIntervalInfo->pBQ = NULL;
return code;
}
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
SIntervalAggOperatorInfo* pInfo = pOper->info;
return resetInterval(pOper, pInfo);
}
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = terrno;
lino = __LINE__;
goto _error;
}
pOperator->pPhyNode = pPhyNode;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
SExprSupp* pSup = &pOperator->exprSupp;
pSup->hasWindowOrGroup = true;
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 512);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
SInterval interval = {.interval = pPhyNode->interval,
.sliding = pPhyNode->sliding,
.intervalUnit = pPhyNode->intervalUnit,
.slidingUnit = pPhyNode->slidingUnit,
.offset = pPhyNode->offset,
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
.timeRange = pPhyNode->timeRange};
calcIntervalAutoOffset(&interval);
STimeWindowAggSupp as = {
.waterMark = pPhyNode->window.watermark,
.calTrigger = pPhyNode->window.triggerType,
.maxTs = INT64_MIN,
};
pInfo->win = pTaskInfo->window;
pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
pInfo->interval = interval;
pInfo->twAggSup = as;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
pInfo->limited = true;
pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
}
if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
pInfo->slimited = true;
pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
pInfo->curGroupId = UINT64_MAX;
}
if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
pTaskInfo->pStreamRuntimeInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
}
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorResetStateFn(pOperator, resetIntervalOperState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
return code;
}
// todo handle multiple timeline cases. assume no timeline interweaving
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
if (!pColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
bool masterScan = true;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
int64_t gid = pBlock->info.id.groupId;
int64_t gap = pInfo->gap;
if (!pInfo->reptScan) {
pInfo->reptScan = true;
pInfo->winSup.prevTs = INT64_MIN;
}
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
} else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
} else { // start a new session window
// start a new session window
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, ret);
}
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
ret =
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
}
}
SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, ret);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
setOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
int64_t st = taosGetTimestampUs();
int32_t order = pInfo->binfo.inputTsOrder;
SOperatorInfo* downstream = pOperator->pDownstream[0];
pInfo->cleanGroupResInfo = false;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
GET_STM_RTINFO(pOperator->pTaskInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
// the pDataBlock are always the same one, no need to call this again
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
QUERY_CHECK_CODE(code, lino, _end);
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
// restore the value
pOperator->status = OP_RES_TO_RETURN;
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
setOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
SStateWindowOperatorInfo* pInfo = pOper->info;
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
SStateWinodwPhysiNode* pPhynode = (SStateWinodwPhysiNode*)pOper->pPhyNode;
pOper->status = OP_NOT_OPENED;
resetBasicOperatorState(&pInfo->binfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code == 0) {
code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
}
if (code == 0) {
code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
&pTaskInfo->storageAPI.functionStore);
}
pInfo->cleanGroupResInfo = false;
pInfo->hasKey = false;
cleanupGroupResInfo(&pInfo->groupResInfo);
memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
return code;
}
// todo make this as an non-blocking operator
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = terrno;
goto _error;
}
pOperator->pPhyNode = pStateNode;
pOperator->exprSupp.hasWindowOrGroup = true;
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
if (pInfo->stateKey.pData == NULL) {
goto _error;
}
pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
pTaskInfo->pStreamRuntimeInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->tsSlotId = tsSlotId;
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
pInfo->trueForLimit = pStateNode->trueForLimit;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorResetStateFn(pOperator, resetStatewindowOperState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyStateWindowOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
void destroySWindowOperatorInfo(void* param) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
if (pInfo == NULL) {
return;
}
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
if (pInfo->pOperator) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
SSessionAggOperatorInfo* pInfo = pOper->info;
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
pOper->status = OP_NOT_OPENED;
resetBasicOperatorState(&pInfo->binfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code == 0) {
code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
}
if (code == 0) {
code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
&pTaskInfo->storageAPI.functionStore);
}
pInfo->cleanGroupResInfo = false;
pInfo->winSup = (SWindowRowsSup){0};
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
cleanupGroupResInfo(&pInfo->groupResInfo);
return code;
}
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = terrno;
goto _error;
}
pOperator->pPhyNode = pSessionNode;
pOperator->exprSupp.hasWindowOrGroup = true;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
pInfo->gap = pSessionNode->gap;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
pTaskInfo->pStreamRuntimeInfo);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->pTaskInfo = pTaskInfo;
setOperatorResetStateFn(pOperator, resetSessionAggOperState);
code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
void destroyMAIOperatorInfo(void* param) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
taosMemoryFreeClear(param);
}
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (NULL == pResult) {
return pResult;
}
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
return pResult;
}
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
*pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
if (*pResult == NULL) {
return terrno;
}
}
// set time window for current result
(*pResult)->win = (*win);
return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SInterval* pInterval = &iaInfo->interval;
int32_t startPos = 0;
int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
// there is an result exists
if (miaInfo->curTs != INT64_MIN) {
if (ts != miaInfo->curTs) {
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
miaInfo->curTs = ts;
}
} else {
miaInfo->curTs = ts;
}
STimeWindow win = {0};
win.skey = miaInfo->curTs;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
int32_t currPos = startPos;
STimeWindow currWin = win;
while (++currPos < pBlock->info.rows) {
if (tsCols[currPos] == miaInfo->curTs) {
continue;
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
miaInfo->curTs = tsCols[currPos];
currWin.skey = miaInfo->curTs;
currWin.ekey =
taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
startPos = currPos;
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
miaInfo->curTs = currWin.skey;
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
pRes->info.id.groupId = pMiaInfo->groupId;
pMiaInfo->curTs = INT64_MIN;
pMiaInfo->groupId = 0;
}
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
SIntervalAggOperatorInfo* pIaInfo = pMiaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pIaInfo->binfo.pRes;
SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = NULL;
if (pMiaInfo->prefetchedBlock == NULL) {
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = pMiaInfo->prefetchedBlock;
pMiaInfo->prefetchedBlock = NULL;
pMiaInfo->groupId = pBlock->info.id.groupId;
}
// no data exists, all query processing is done
if (pBlock == NULL) {
// close last unclosed time window
if (pMiaInfo->curTs != INT64_MIN) {
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
cleanupAfterGroupResultGen(pMiaInfo, pRes);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
setOperatorCompleted(pOperator);
break;
}
if (pMiaInfo->groupId == 0) {
if (pMiaInfo->groupId != pBlock->info.id.groupId) {
pMiaInfo->groupId = pBlock->info.id.groupId;
pRes->info.id.groupId = pMiaInfo->groupId;
}
} else {
if (pMiaInfo->groupId != pBlock->info.id.groupId) {
// if there are unclosed time window, close it firstly.
if (pMiaInfo->curTs == INT64_MIN) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows == 0) {
// After filtering for last group, the result is empty, so we need to continue to process next group
continue;
} else {
break;
}
} else {
// continue
pRes->info.id.groupId = pMiaInfo->groupId;
}
}
pRes->info.scanFlag = pBlock->info.scanFlag;
code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = pMiaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
if (iaInfo->binfo.mergeResultBlock) {
while (1) {
if (pOperator->status == OP_EXEC_DONE) {
break;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
doMergeAlignedIntervalAgg(pOperator);
}
} else {
doMergeAlignedIntervalAgg(pOperator);
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
(*ppRes) = (rows == 0) ? NULL : pRes;
return code;
}
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SResultRow* pResultRow;
pInfo->groupId = 0;
pInfo->curTs = INT64_MIN;
pInfo->prefetchedBlock = NULL;
pInfo->pResultRow = NULL;
return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
}
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) {
code = terrno;
goto _error;
}
pOperator->pPhyNode = pNode;
miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
if (miaInfo->intervalAggOperatorInfo == NULL) {
code = terrno;
goto _error;
}
SInterval interval = {.interval = pNode->interval,
.sliding = pNode->sliding,
.intervalUnit = pNode->intervalUnit,
.slidingUnit = pNode->slidingUnit,
.offset = pNode->offset,
.precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
.timeRange = pNode->timeRange};
calcIntervalAutoOffset(&interval);
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp;
pSup->hasWindowOrGroup = true;
code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
pTaskInfo->pStreamRuntimeInfo);
QUERY_CHECK_CODE(code, lino, _error);
miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
iaInfo->interval = interval;
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 512);
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&iaInfo->binfo, pResBlock);
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
iaInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);
iaInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
_error:
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo;
SList* groupIntervals;
SListIter groupIntervalsIter;
bool hasGroupId;
uint64_t groupId;
SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo;
typedef struct SGroupTimeWindow {
uint64_t groupId;
STimeWindow window;
} SGroupTimeWindow;
void destroyMergeIntervalOperatorInfo(void* param) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
taosMemoryFreeClear(param);
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
bool ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
int32_t code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
SListIter iter = {0};
tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
SListNode* listNode = NULL;
while ((listNode = tdListNext(&iter)) != NULL) {
SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
if (prevGrpWin->groupId != tableGroupId) {
continue;
}
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
taosMemoryFreeClear(tmp);
}
}
return TSDB_CODE_SUCCESS;
}
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pExprSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
uint64_t tableGroupId = pBlock->info.id.groupId;
bool ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->binfo.inputTsOrder);
int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
iaInfo->binfo.inputTsOrder);
if (forwardRows <= 0) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
// window start key interpolation
ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
forwardRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
iaInfo->binfo.inputTsOrder);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
code =
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, code);
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
iaInfo->binfo.inputTsOrder);
// window start(end) key interpolation
code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
forwardRows, pBlock->info.rows, numOfOutput);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed
code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (iaInfo->timeWindowInterpo) {
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
}
}
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExprSupp* pExpSupp = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
if (!miaInfo->inputBlocksFinished) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = miaInfo->prefetchedBlock;
miaInfo->groupId = pBlock->info.id.groupId;
miaInfo->prefetchedBlock = NULL;
}
if (pBlock == NULL) {
tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
miaInfo->inputBlocksFinished = true;
break;
}
if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.id.groupId;
} else if (miaInfo->groupId != pBlock->info.id.groupId) {
miaInfo->prefetchedBlock = pBlock;
break;
}
pRes->info.scanFlag = pBlock->info.scanFlag;
code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
pRes->info.id.groupId = miaInfo->groupId;
}
if (miaInfo->inputBlocksFinished) {
SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
pRes->info.id.groupId = grpWin->groupId;
}
}
if (pRes->info.rows == 0) {
setOperatorCompleted(pOperator);
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (rows == 0) ? NULL : pRes;
return code;
}
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
pInfo->hasGroupId = false;
pInfo->groupId = 0;
pInfo->prefetchedBlock = NULL;
pInfo->inputBlocksFinished = false;
tdListEmpty(pInfo->groupIntervals);
SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
return resetInterval(pOper, pIntervalInfo);
}
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pMergeIntervalInfo == NULL || pOperator == NULL) {
code = terrno;
goto _error;
}
pOperator->pPhyNode = pIntervalPhyNode;
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
.timeRange = pIntervalPhyNode->timeRange};
calcIntervalAutoOffset(&interval);
pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
pIntervalInfo->win = pTaskInfo->window;
pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
pIntervalInfo->interval = interval;
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
SExprSupp* pExprSupp = &pOperator->exprSupp;
pExprSupp->hasWindowOrGroup = true;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
pIntervalInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
}
pIntervalInfo->pOperator = pOperator;
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
_error:
if (pMergeIntervalInfo != NULL) {
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}