TDengine/source/dnode/vnode/src/bse/bseTable.c
Pan Wei f0df7a26d7
Feat/ts 6100 3.0.0722m (#32103)
* migrate system-test/2-query

* revert file

* update case.task

* resolve script migrate

* run new test framework on new_testcases

* migrate system-test/2-query

* format docstring

* fix test validation

* fix test validation

* fix error

* migrate army case

* migrate army case

* fix error

* migrate system-test/2-query

* migrate system-test/2-query

* migrate system-test/2-query

* migrate system-test/2-query

* test exe time

* fix ci error

* migrate system-test/1-insert

* new common function

* migrate system-test/1-insert

* fix ci error

* migrate system-test/1-insert

* feat: add configuration and script for memory allocator settings

* fix: correct HEAPPROFILE path and remove redundant metadata_thp setting in memory allocator script

* fix ci error

* migrate system-test/1-insert, 2-query

* feat:insert into subquery (#31401) (#31710)

* migrate system-test/99-TDcase

* feat(gpt): add grant check for gpt. (#31708)

* migrate system-test/99-TDcase

* migrate system-test/99-TDcase

* migrate system-test/99-TDcase

* fix/send-heartbeat-statis (#31680)

* migrate system-test/7-tmq

* chore: support cmake option TAOSWS_GIT_TAG like taosadapter [skip ci] (#31486)

* add system-test/6/cluster test

* chore: move default branch from main to 3.3.6 for adapter/taosws

* fix docstring validation

* migrate system-test/7-tmq

* migrate system-test/7-tmq

* feat: add set_taos_malloc_env configuration and update related scripts

* Update 03-kubernetes.md

* enh: add log for snapshot (#31681)

* simple test

* more

* migrate system-test/7-tmq

* migrate system-test 0-others cases

* migrate system-test/7-tmq

* fix docstring validation

* migrate system-test/7-tmq

* fix docstring validation

* fix: invalid queue

* fix docstring validation

* refactor: reorganize memory allocator script constants and improve mode descriptions

* enh: TD-36324-improve-sync-heartbeat-log (#31727)

* recover log level

* fix: taosd crush in query when insufficient memory (#31746)

Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* fix: overflow check in snprintf (#31780)

* migrate system-test/7-tmq

* migrate some cases

* migrate system-test/7-tmq

* fix failed cases

* fix failed cases

* migrate some cases

* fix failed cases

* fix failed cases

* migrate testcases

* fix: update environment file path in taosd.service and adjust set_taos_malloc.sh configuration

* fix: set lcn before do s3 migrate. (#31782)

* migrate testcases

* fix failed cases

* fix: add condition to set default malloc config for taosd service

* fix: update usage message in set_taos_malloc.sh to include quiet mode option

* fix: add quiet mode option to set malloc config in install script

* fix: update set_taos_malloc.sh to improve output messages and adjust default malloc config invocation

* recover log level

* fix(tmq): [TS-6569]tdb error if write tmq meta data in multi thread (#31808)

* fix: the calculation of dnode uptime (#31832)

* migrate testcase

* fix: correct timediff function bug and redress docs (#31798)

* fix: enhance malloc configuration for taosd and taosadapter in install script

* fix: TD-36442 show full condition (#31796)

* enh/TD-36466-sync-heartbeat (#31805)

* fix(plan) virtual table support BI moudle when use in select (#31787)

* fix failed cases

* enh: TS-5926-force-repair-wal (#31828)

* migrate testcase

* rename taos & taosd (#31855)

* migrate testcase

* migrate testcase

* fix failed cases

* fix: update environment file paths in taosd.service and set_taos_malloc.sh

* feat: add performance tuning documentation for memory optimization and set_taos_malloc.sh usage

* fix: update file paths in performance tuning documentation for consistency

* feat: add performance tuning documentation for memory allocator configuration script

* fix: add missing line breaks for improved readability in performance tuning documentation

* fix: adjust heading levels for consistency in performance tuning documentation

* fix cases

* fix new_testcases

* fix failed cases

* fix(query): support show tags on virtual table (#31831)

* fix failed cases

* docs: replace mysql screenshot (#31888)

* feat: use the new TDengine product name (#31859)

* merge 3.0

* fix failed cases

* fix failed cases

* merge 3.0

* fix cases

* fix cases

* doc: Update 03-stream.md (#31675)

* chore(deps): bump requests from 2.27.1 to 2.32.4 in /test (#31326)

Bumps [requests](https://github.com/psf/requests) from 2.27.1 to 2.32.4.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.27.1...v2.32.4)

---
updated-dependencies:
- dependency-name: requests
  dependency-version: 2.32.4
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix new cases

* fix:Convert line endings from LF to CRLF for ans file

* build(deps): bump golang.org/x/net in /tools/keeper (#30811)

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.36.0 to 0.38.0.
- [Commits](https://github.com/golang/net/compare/v0.36.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump urllib3 from 1.26.20 to 2.5.0 in /test (#31414)

Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.20 to 2.5.0.
- [Release notes](https://github.com/urllib3/urllib3/releases)
- [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst)
- [Commits](https://github.com/urllib3/urllib3/compare/1.26.20...2.5.0)

---
updated-dependencies:
- dependency-name: urllib3
  dependency-version: 2.5.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.tomcat.embed:tomcat-embed-core (#31392)

Bumps org.apache.tomcat.embed:tomcat-embed-core from 9.0.104 to 9.0.106.

---
updated-dependencies:
- dependency-name: org.apache.tomcat.embed:tomcat-embed-core
  dependency-version: 9.0.106
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.tomcat.embed:tomcat-embed-core (#31393)

Bumps org.apache.tomcat.embed:tomcat-embed-core from 9.0.104 to 9.0.106.

---
updated-dependencies:
- dependency-name: org.apache.tomcat.embed:tomcat-embed-core
  dependency-version: 9.0.106
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump org.apache.kafka:kafka-clients (#31341)

Bumps org.apache.kafka:kafka-clients from 3.9.0 to 3.9.1.

---
updated-dependencies:
- dependency-name: org.apache.kafka:kafka-clients
  dependency-version: 3.9.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: add lock when calculating log buffer start/end (#31883)

* fix new cases

* fix new cases

* fix failed cases

* feat: new stream (#31678)

* fix: windows compile issue

* test: add vtable cases (#31829)

* fix: windows compile issues

* test:add test cases

* fix: windows compile issue

* case: em-4 stream case submit

* test: stream4_sub1 found bug2

* test: submit test_scene_meters_bug2.py

* add stream parameters example

* feat: [TS-6100] Do not translate const value as column.

* Feat/ts 6100 3.0 zlv (#31747)

* modify asan exampel

* modify asan exampel

* add example

* add example

* modify case example

---------

Co-authored-by: zelv01 <1101510017@qq.com>

* feat(stream): fix memory leak

* modify sliding example

* test: update test case.

* feat(stream): fix conflicts

* fix: add offset case 10a 10s 10m 10h 10d

* feat(stream): fix conflicts

* chore(stream): rename case name #TS-6100

* add case

* modify example

* fix: windows compile issues

* fix: data null check

* feat: [TS-6100] Forbid where when using %%trows (#31827)

* feat: [TS-6100] Forbid where when using %%trows

* test: update cases

* feat: [TS-6100] Fix leaks.

---------

Co-authored-by: Simon Guan <guanshengliang@qq.com>

* test: reproduce bugs

* test: update test case.

* test: update test case.

* feat: [TS-6100] Fix leaks.

* test: add cases

* Feat/ts 6100 3.0.pw10 (#31841)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* test: reproduce bugs

* fix: add sliding interval combine case

* test: add cases

* test: add recalc test.

* test: reproduce bugs

* case : add vt ts is null check

* modify case

* bug: submit test_idmp_meters_bug3.py

* test: add test for recalc.

* test: add cases

* fix: error code check

* test: add cases

* fix(stream): scan wal with schema in that version

* add case

* test: add cases

* test: update test case.

* fix: windows compile issues

* add case

* test: add cases (#31845)

* modify case

* fix: reset interpPrev

* test: add test_idmp_meters bug4 and bug3

* add case

* fix(stream): opti wal interface

* fix: remove test_idmp_meters_bug5.py

* test: add cases

* fix(stream): fix ts data fetch for virtual tables

* cancel asan case

* test: update test case.

* test: update test case.

* add case

* test: add cases

* test: add cases

* test: add case test_idmp_meters_bug5.py

* test: update test case.

* fix(stream): tmq error

* test: add cases

* feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use.

* fix(stream): optimize val scan logic

* test: add test_recalc_expired_time.py  to ci.

* test: update test case.

* test: update test case.

* feat: [TS-6100] Fix fill range check

* fix(stream): optimize val scan logic

* add case

* test: modify for partition by %%1

* test: add fun case stream4_sub7

* fix(stream): optimize val scan logic

* add case

* feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS.

* test: add test for recalc.

* test: use stream_options.

* fix: some cases error.

* test: remove recalc from ci.

* fix: ci case issues (#31880)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

* fix: external window idx issue

* fix: ci issues

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* fix(stream): fix compilation error

* fix(stream): optimize val scan logic

* test:add test cases

* test: modify case

* fix: external agg error

* test(stream): tobacco scene testing #TD-36514

* test: add stream cases (#31885)

* fix: windows compile issue

* fix: calc timerange

* fix: windows compile issue

* modify case

* fix(stream): compile error

* test: remove one debug test case file

* test: modify

* test: add test cases

* test: reproduce bugs

* test: reproduce bugs

* feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868)

* feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM.

* test: update case

---------

Co-authored-by: Simon Guan <guanshengliang@qq.com>

* add example

* add example

* modify case example

* modify case

* test:alter sql

* test: add stream5 case

* fix(stream): get schema error with version

* test: add delete recalc test py.

* test: remove bug cases

* test: stream5 case test passed

* test: add state cases (#31893)

* fix(stream): compile error

* test: modify case

* test: add cases

* test: add test.

* test: update test case.

* chore(test): fix case err

* test: update test case.

* fix: align data get

* fix(stream): fix row index of datablock written into data cache

* fix: put align data

* test: update test case.

* test: add test cases for virtual table

* chore(test): fix case err #TD-36514

* add case

* test: add test for water mark.

* test: add meters bug6 for stream5

* test: add cases (#31903)

* test: add test for recalc.

* feat: [TS-6100] %%trows can only be used when event type is window close.

* test: add precision of database for ms/us/ns

* modify case

* add case

* add case

* test: add test to ci.

* modify case

* fix: ci case issues (#31904)

* enh: add operator reset func

* fix: merge join reset issue

* fix: memory issues

* fix: add debug assert

* fix: memory issues

* fix: memory leak

* fix: memory issues

* fix taos log miss

* fix: case issue

* fix: case issue

* fix: case issues

* fix: drop dnode issue

* fix: memory issues

* fix: memory issues

* fix: memory leak issues

* fix: recalculate time range issue

* fix: add debug log

* fix: memory issues

* fix: enable case asan

* Update streamlist_for_ci.task

* fix: case asan issue

* fix: stream name issue

* fix: external window compile issues

* fix: deploy memory issue

* fix: ahandle issue

* fix: ahandle issue

* fix: ahandle issue

* fix: virtual table reader list issue

* fix: log info

* fix: msg error

* fix: virtual table addr list issue

* fix: memory issues

* fix: memory leak issue

* fix: memory issues

* fix: memory free issues

* fix: memory issues

* fix: snode deploy issue

* fix: mnode reader issue

* fix: memory issues

* fix: add debug test

* enh: add ignore nodata trigger

* fix: memory leaks

* fix: configuration issue

* fix: memory issue

* fix: external window issue

* fix: external window issues

* fix: external window placeholder issue

* fix: placeholder function init issues

* fix: memory leak issue

* fix: add debug log

* fix: compile issues

* fix: double free issue

* fix: runner addr update issue

* fix: msg rsp issue

* fix: external window reset issue

* fix: configuration issue

* fix: deploy msg issue

* fix: compile issue

* fix: external window idx issue

* fix: ci issues

* fix: ci case issues

* fix: drop dnode issue

---------

Co-authored-by: huohong <sallyhuo@taosdata.com>

* fix(stream): ci error

* test: update test case.

* feat: [TS-6100] Disable some failed UT.

* feat: [TS-6100] Fix virtual table

* test: add bug 5.

* test: add test delete recalc to ci.

* test: add bug 6.

* test(stream): tobacco scene #TD-36514

* fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext

Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* test: add case stream6

* fix(stream): implement some pending features in trigger task

* modify case

* modify case

* fix: case issues

* modify case

* test: add recalc for warter mark.

* fix(stream): fix count window trigger of virtual tables

* fix(stream): memory leak

* test: fix run err.

* test: add stream6 bug7

* fix: adjust format

* test(stream): tobacco scene testing #TD-36514

* test: change bug7 with update window1 and 2

* test: add test bug 7.

* case: restore write 3 window

* fix: windows compile issue

* fix: notify

* test: add cases

* modify case

* test: update test case.

* test(stream): toobacco scene testing #TD-36514

---------

Co-authored-by: Simon Guan <slguan@taosdata.com>
Co-authored-by: plum-lihui <huili@taosdata.com>
Co-authored-by: Alex Duan <417921451@qq.com>
Co-authored-by: zelv01 <1101510017@qq.com>
Co-authored-by: Jing Sima <simondominic9997@outlook.com>
Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com>
Co-authored-by: wangmm0220 <wangmm0220@gmail.com>
Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: zyyang90 <zyyang@taosdata.com>
Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com>
Co-authored-by: facetosea <285808407@qq.com>
Co-authored-by: Simon Guan <guanshengliang@qq.com>
Co-authored-by: huohong <sallyhuo@taosdata.com>
Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: xiao-77 <berylbao@taosdata.com>
Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com>
Co-authored-by: happyguoxy <happy_guoxy@163.com>
Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com>
Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>

* test: rename TSDB

* docs: fix rust examples (#31908)

* docs: modify rust native test case

* docs: modify rust ws test case

* docs: modify rust examples

* docs: update rust pool docs

* fix new cases

* migrate test case

* feat: support reading sub table names and tag values from CSV files to create sub tables (#31909)

* feat: add obtaining table names from tag files

* feat: add write data table control

* feat: add table params to write

* feat: delete log file

* feat: modify test case csv path

* feat: resolve memory leakage in the table building thread

* feat: resolve compilation errors

* feat: resolve table name copy len error

* feat: modify create table log level

* feat: modifying query configuration parameter array out of bounds

* feat: support custom primary key names

* feat: modify log level

* feat: add set primary key name case

* feat: add column  keywords case

* feat: add keywords case data

* feat: modify primaryKeyName value len

* feat: modify primaryKeyName value define

* feat: modify primaryKeyName value size

* fix: compile issue (#31943)

Co-authored-by: taos-support <it@taosdata.com>

* package: fix error

* package: fix error

* fix failed cases

* merge 3.0

* rename create_table_keywords.py to test_create_table_keywords.py

* fix failed cases

* fix new cases

* docs: update stream (#31957)

* docs: update jdbc out-dated descripiton (#31959)

* fix: TD-36560 refactor arbitrator group function name and log (#31852)

* feat: support BLOB data type (#31704)

* rename 0-others/mounts.py to 0-others/test_mounts.py

* fix failed cases

* docs: update gpt (#31975)

* fix failed cases

* fix failed cases

* package: fix error

* feat: add taosBenchmark command line parameters (#31967)

* feat: add command line parameters

* feat: add command line parameter test cases

* fix: tableName len error

* enh: set TD Release build in tdengine-build.yml

* Update tdengine-build.yml

* fix: add json file path log

* fix: streamline TD_CONFIG export in build steps

* fix: Restore the build configuration

---------

Co-authored-by: haoranchen <haoran920c@163.com>

* fix: tableName len error (#31977)

* fix: tableName len error

* fix: modify TD_CONFIG=Release

* fix: code format

* fix: Restore the build configuration

* enh: set TD Release build in tdengine-build.yml (#31980)

* enh: set TD Release build in tdengine-build.yml

* Update tdengine-build.yml

* fix: update cache key for externals to include debug build version

* fix: remove verbose flag from build commands in tdengine-build.yml

* skip memleak cases

* fix failed case

* fix failed cases

* package: fix error

* fix(stmt2):tbname error output (#31997)

* fix: possible memory leak (#31972)

* feat: create connect add dbname params (#32002)

* feat: create connect add dbname params

* fix: connect param error

* skip failed cases

* fix cases on windows

* fix cases

* support connect bi mode and fix log level

* unique sql connect username and password

* fix log level

* enh: mounted vnode may have no tq (#31916)

* fix: subquery memleak (#32024)

* fix failed case

* fix cases

* rename 2-query/test_insert_select.py to 2-query/test_system_insert_select.py

* skip memleak cases

* enh: rename data forecast/detect to forecasting/anomaly detection (#32021)

* package: unique product name

* package: update for main

* skip tsim cases

* chore: update jdbc connection pool validation query sql (#32056)

* refactor: quotes usage in bash scripts

Signed-off-by: WANG Xu <feici02@outlook.com>

* enhn(mqtt/rawblock): new format for msg payload (#31801)

* fix: fix broken link in 14-stream.md

* docs: 15-spark.md is missing end semicolon (#32068)

* chore: bump dev version to 3.3.7.0.alpha (#32066)

* fix: blob test (#32020)

* fix blob query error

* fix blob query error

* fix blob query error

* fix blob query error

* fix blob query error

* opt query

* opt write

* opt write

* opt write

* opt bse

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* opt write

* add cache

* opt query

* opt query

* opt bse

* add data iter

* add data iter

* add more compress

* add more compress

* add more compress

* add more compress

* add more compress

* add more compress

* opt blob transfer

* opt blob transfer

* opt blob transfer

* opt write

* avoid unordered data write

* avoid unordered data write

* opt read

* refactor log

* fix invalid write

* refactor code

* fix merge error

* fix merge error

* add error code

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* support blob type len

* Merge remote-tracking branch 'origin/3.0' into enh/blob

* refactor code

* support blob type len

* refactor code

* refactor code

* benchmark support blob type

* benchmark support blob type

* add log

* handle sort and merge row

* change file set

* change file set

* change file set

* change file set

* change file set

* change file set

* change file set

* opt code

* opt read

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* add restart error and add unit test

* refactor code

* add restart error and add unit test

* opt code

* refactor code

* fix invalid write

* blob test

* blob test

* blob test

* blob test

* blob test

* support blob

* add str trim

* add str trim

* Merge remote-tracking branch 'origin/3.0' into feat/blob

* Merge remote-tracking branch 'origin/3.0' into feat/blob

* update test case

* fix invalid read

* fix invalid read

* fix invalid read

* add stmt2

* add stmt2

* add stmt2

* update parameter

* refactor test case

* refactor test case

* support blob

* support stmt2

* add sub

* support blob

* support blob

* support sub

* fix tmq crash

* support windows/darwin

* fix stmt2 bind row

* fix blob crash

* fix blob crash

* fix blob query error

* fix blob crash

* fix merge error

* refactor bse

* add blob transfer

* add blob transfer

* add transfer snapshot

* refactor code

* change log level

* change log level

* add test case

* refactor code

* revert taosBenchmark

* revert taosbench

* fix: improve error handling and encoding for file reading in grep_asserts_in_file function

* rm assert

* fix mem leak

* fix compile error

* fix conflict

* fix conflict

* fix compile error

* fix compile error

* fix pre check error

* fix pre check error

* fix compile error on windows

* fix error on window

* fix error on windows

* fix error on windows

* opt no-blob sql

* fix compile error on dawain

* fix compile error on dawain

* fix invalid read

* fix mem leak

* fix invalid read

* fix invalid read

* fix error on windows

* remove unused code

* refactor code

* fix mem leak

* fix mem leak

* rm unused code

* rm unused code

* fix invalid copy

* fix invalid copy

* fix invalid copy

* fix invalid copy

* fix invalid copy

* refactor code

* make ci happy

* refactor code

* change make

* update test case

* update ignore code

* update bse snapshot and test case

* update bse snapshot and test case

* refactor code

* change unit test

* update bse snapshot

* update bse snapshot

* fix test case

* fix bse snapshot

* fix bse snapshot

* fix snapshot transfer

* remove unused log

* support func query

* add test case

* forbidden  unsupport code

* merge 3.0

* merge 3.0

* change test case

* add forbidden code

* add forbidden code

* add code

* support length func

* support length func

* taosBenchmark support  blob

* support blob raw block

* support write raw block

* support more query

* Merge branch 'feat/blob' into feat/blob_test

* SBlobRow2

* change bse commit change

* refactor code

* rm exe test

* refactor code

* checke return code

* rename blob name

* refactor code

* refactor code

* refactor code

* refactor code

* fix unordere write

* fix unordere write

* fix row merge error

* fix row merge error

* fix row merge error

* fix row merge error

* support ordered data

* support ordered data

* Merge remote-tracking branch 'origin/3.0' into feat/blob_test

* fix stmt2 blob crash

* add not support write type

* add not support write type

* add not support write type

* rm exe

* fix col-formate error

* fix col-formate error

* fix mem leak

* refactor code

* add error code for single blob column restriction and update error message

* add error code for single blob column restriction and update error message

* add error code for single blob column restriction and update error message

* refactor code

* change error code

* make ci happy

---------

Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
Co-authored-by: chenhaoran <haoran920c@163.com>

* package: update for main (#32091)

* fix compile error on windows (#32089)

* fix: source code merge issues

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: WANG Xu <feici02@outlook.com>
Co-authored-by: minhuinie <nminhui@163.com>
Co-authored-by: huohong <sallyhuo@taosdata.com>
Co-authored-by: chenhaoran <haoran920c@163.com>
Co-authored-by: Nie Minhui <143420805+minhuinie@users.noreply.github.com>
Co-authored-by: Mario Peng <48949600+Pengrongkun@users.noreply.github.com>
Co-authored-by: Haojun Liao <hjxilinx@users.noreply.github.com>
Co-authored-by: dongming chen <cademfly@hotmail.com>
Co-authored-by: Linhe Huo <linhehuo@gmail.com>
Co-authored-by: huohong <346479823@qq.com>
Co-authored-by: Joel Brass <joel@jbrass.com>
Co-authored-by: WANG Xu <feici02@outlook.com>
Co-authored-by: Hongze Cheng <hzcheng@taosdata.com>
Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com>
Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
Co-authored-by: Kaili Xu <klxu@taosdata.com>
Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com>
Co-authored-by: WANG MINGMING <wangmm0220@gmail.com>
Co-authored-by: hongzhenliu <wluckyjob@gmail.com>
Co-authored-by: Daniel Clow <106956386+danielclow@users.noreply.github.com>
Co-authored-by: She Yanjie <57549981+sheyanjie-qq@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Simon Guan <slguan@taosdata.com>
Co-authored-by: plum-lihui <huili@taosdata.com>
Co-authored-by: Alex Duan <417921451@qq.com>
Co-authored-by: zelv01 <1101510017@qq.com>
Co-authored-by: Jing Sima <simondominic9997@outlook.com>
Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com>
Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: zyyang90 <zyyang@taosdata.com>
Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com>
Co-authored-by: facetosea <285808407@qq.com>
Co-authored-by: Simon Guan <guanshengliang@qq.com>
Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: xiao-77 <berylbao@taosdata.com>
Co-authored-by: happyguoxy <happy_guoxy@163.com>
Co-authored-by: guozhenwei <2227465945@qq.com>
Co-authored-by: kevin men <men_shi_bin@163.com>
Co-authored-by: taos-support <it@taosdata.com>
Co-authored-by: Yihao Deng <luomoxyz@126.com>
Co-authored-by: Minglei Jin <49711132+stephenkgu@users.noreply.github.com>
Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-07-22 13:25:21 +08:00

2091 lines
61 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "bseTable.h"
#include "bse.h"
#include "bseCache.h"
#include "bseSnapshot.h"
#include "bseTableMgt.h"
#include "osMemPool.h"
#include "vnodeInt.h"
// table footer func
static int32_t footerEncode(STableFooter *pFooter, char *buf);
static int32_t footerDecode(STableFooter *pFooter, char *buf);
// block handle func
static int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf);
static int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf);
// table meta func
static int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf);
static int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf);
static int32_t metaBlockAdd(SBlock *p, SMetaBlock *pMeta);
static int32_t metaBlockGet(SBlock *p, SMetaBlock *pMeta);
// table footer func
static int32_t footerEncode(STableFooter *pFooter, char *buf);
static int32_t footerDecode(STableFooter *pFooter, char *buf);
// block func
static int32_t blockCreate(int32_t cap, SBlock **pBlock);
static void blockDestroy(SBlock *pBlock);
static int32_t blockPut(SBlock *pBlock, int64_t seq, uint8_t *value, int32_t len);
static int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len);
static int32_t blockEsimateSize(SBlock *pBlock, int32_t extra);
static void blockClear(SBlock *pBlock);
static int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len);
static int8_t blockGetType(SBlock *p);
static int32_t blockSeekMeta(SBlock *p, int64_t seq, SMetaBlock *pMeta);
static int32_t blockGetAllMeta(SBlock *p, SArray *pResult);
static int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo);
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter);
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta);
void tableMetaWriterClose(SBtableMetaWriter *p);
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle);
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader);
void tableMetaReaderClose(SBtableMetaReader *p);
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p);
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name);
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter);
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle);
void tableMetaReaderIterClose(SBtableMetaReaderIter *p);
// STable builder func
static int32_t tableBuilderGetBlockSize(STableBuilder *p);
static int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
static int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len);
static void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo);
static void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo);
// STable pReaderMgt func
static int32_t tableReaderInitMeta(STableReader *p, SBlock *pBlock);
static int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
static int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper);
static int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper);
static int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper);
static int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size);
static int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int32_t *nWrite);
static int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk);
static int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int8_t checkSum);
/*---block formate----*/
//---datatype--|---len---|--data---|--rawdatasize---|--compressType---|---checksum---|
#define BLOCK_ROW_SIZE_OFFSET(p) (sizeof(SBlock) + (p)->len)
#define BLOCK_ROW_SIZE(p) BLOCK_ROW_SIZE_OFFSET(p)
#define BLOCK_COMPRESS_TYPE_OFFSET(p) (BLOCK_ROW_SIZE_OFFSET(p) + sizeof(int32_t))
#define BLOCK_CHECKSUM_OFFSET(p) (BLOCK_COMPRESS_TYPE_OFFSET(p) + sizeof(int8_t))
#define BLOCK_TOTAL_SIZE(p) (BLOCK_CHECKSUM_OFFSET(p) + sizeof(TSCKSUM))
#define BLOCK_SET_ROW_SIZE(p, size) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p)) = (size)
#define BLOCK_GET_ROW_SIZE(p) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p))
#define BLOCK_SET_COMPRESS_TYPE(p, type) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p)) = (type)
#define BLOCK_GET_COMPRESS_TYPE(p) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p))
#define BLOCK_TAIL_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(TSCKSUM))
#define COMREPSS_DATA_SET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
do { \
*(int32_t *)((char *)(p) + len) = (rawLen); \
*(int8_t *)((char *)(p) + len + sizeof(int32_t)) = (type); \
} while (0);
#define COMPRESS_DATA_GET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
do { \
(rawLen) = *(int32_t *)((char *)(p) + len - BLOCK_TAIL_LEN); \
(type) = *(int8_t *)((char *)(p) + len - BLOCK_TAIL_LEN + sizeof(int32_t)); \
} while (0);
int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len) {
int32_t code = 0;
int32_t lino = 0;
SBlockWrapper blockWrapper = {0};
code = tableBuilderLoadBlock(p, pHandle, &blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
code = blockSeek(blockWrapper.data, seq, pValue, len);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to seek data from table builder at lino %d ince %s", lino, tstrerror(code));
}
blockWrapperCleanup(&blockWrapper);
return code;
}
int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper) {
int32_t code = 0;
int32_t lino = 0;
code = blockWrapperInit(pBlkWrapper, pHandle->size);
TSDB_CHECK_CODE(code, lino, _error);
code = tableLoadBlock(p->pDataFile, pHandle, pBlkWrapper);
_error:
if (code != 0) {
bseError("failed to load block from table builder at lino %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableBuilderOpen(int64_t ts, STableBuilder **pBuilder, SBse *pBse) {
int32_t code = 0;
int32_t lino = 0;
char name[TSDB_FILENAME_LEN] = {0};
char path[TSDB_FILENAME_LEN] = {0};
bseBuildDataName(ts, name);
bseBuildFullName(pBse, name, path);
STableBuilder *p = taosMemoryCalloc(1, sizeof(STableBuilder));
if (p == NULL) {
TSDB_CHECK_CODE(terrno, lino, _error);
}
p->retentionTs = ts;
memcpy(p->name, name, strlen(name));
p->pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
if (p->pMetaHandle == NULL) {
TSDB_CHECK_CODE(terrno, lino, _error);
}
p->blockCap = BSE_GET_BLOCK_SIZE(pBse);
code = blockWrapperInit(&p->pBlockWrapper, p->blockCap);
TSDB_CHECK_CODE(code, lino, _error);
p->compressType = BSE_GET_COMPRESS_TYPE(pBse);
TSDB_CHECK_CODE(code, lino, _error);
seqRangeReset(&p->tableRange);
seqRangeReset(&p->blockRange);
p->pBse = pBse;
code = tableOpenFile(path, 0, &p->pDataFile, &p->offset);
*pBuilder = p;
_error:
if (code != 0) {
(void)tableBuilderClose(p, 0);
bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableBuilderGetMetaBlock(STableBuilder *p, SArray **pMetaBlock) {
int32_t code = 0;
SArray *pBlock = taosArrayInit(8, sizeof(SMetaBlock));
if (pBlock == NULL) {
return terrno;
}
for (int32_t i = 0; i < taosArrayGetSize(p->pMetaHandle); i++) {
SBlkHandle *handle = taosArrayGet(p->pMetaHandle, i);
SMetaBlock block = {.type = BSE_TABLE_META_TYPE,
.version = BSE_DATA_VER,
.range = handle->range,
.offset = handle->offset,
.size = handle->size};
if (taosArrayPush(pBlock, &block) == NULL) {
taosArrayDestroy(pBlock);
return terrno;
}
}
*pMetaBlock = pBlock;
return 0;
}
int32_t tableBuilderFlush(STableBuilder *p, int8_t type) {
int32_t code = 0;
int32_t lino = 0;
SBlock *pBlk = p->pBlockWrapper.data;
if (pBlk->len == 0) {
return 0;
}
int8_t compressType = BSE_GET_COMPRESS_TYPE(p->pBse);
SBlockWrapper wrapper = {0};
uint8_t *pWrite = (uint8_t *)pBlk;
int32_t len = BLOCK_TOTAL_SIZE(pBlk);
pBlk->type = type;
BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
if (compressType != kNoCompres) {
code = blockWrapperInit(&wrapper, len + 16);
TSDB_CHECK_CODE(code, lino, _error);
int32_t compressSize = wrapper.cap;
code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
if (code != 0) {
bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
blockWrapperCleanup(&wrapper);
BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
} else {
int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
len = compressSize + BLOCK_TAIL_LEN;
pWrite = (uint8_t *)wrapper.data;
}
}
code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
TSDB_CHECK_CODE(code, lino, _error);
SBlkHandle handle = {.size = len, .offset = p->offset, .range = p->blockRange};
bseDebug("bse flush at offset %" PRId64 " len: %d, block range sseq:%" PRId64 ", eseq:%" PRId64 "", p->offset, len,
handle.range.sseq, handle.range.eseq);
int64_t n = taosLSeekFile(p->pDataFile, handle.offset, SEEK_SET);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
int64_t nwrite = taosWriteFile(p->pDataFile, (uint8_t *)pWrite, len);
if (nwrite != len) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
p->offset += len;
if (taosArrayPush(p->pMetaHandle, &handle) == NULL) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
seqRangeReset(&p->blockRange);
_error:
if (code != 0) {
bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
}
blockWrapperClear(&p->pBlockWrapper);
blockWrapperCleanup(&wrapper);
return code;
}
void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo) {
SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
seqRangeUpdate(&p->tableRange, &range);
}
void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo) {
SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
seqRangeUpdate(&p->blockRange, &range);
}
/*|seq len value|seq len value| seq len value| seq len value|*/
int32_t tableBuilderPutBatch(STableBuilder *p, SBseBatch *pBatch) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = 0, offset = 0;
for (int32_t i = 0; i < taosArrayGetSize(pBatch->pSeq);) {
SBlockItemInfo *pInfo = taosArrayGet(pBatch->pSeq, i);
if (i == 0 || i == taosArrayGetSize(pBatch->pSeq) - 1) {
tableBuildUpdateTableRange(p, pInfo);
}
if (blockEsimateSize(p->pBlockWrapper.data, len + pInfo->size) < tableBuilderGetBlockSize(p)) {
i++;
len += pInfo->size;
tableBuilderUpdateBlockRange(p, pInfo);
continue;
} else {
if (len > 0) {
offset += blockAppendBatch(p->pBlockWrapper.data, pBatch->buf + offset, len);
}
code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE);
TSDB_CHECK_CODE(code, lino, _error);
len = 0;
}
}
if (offset < pBatch->len) {
blockAppendBatch(p->pBlockWrapper.data, pBatch->buf + offset, pBatch->len - offset);
}
_error:
if (code != 0) {
bseError("failed to append batch since %s", tstrerror(code));
}
return code;
}
int32_t tableBuilderTruncFile(STableBuilder *p, int64_t size) {
int32_t code = 0;
int32_t lino = 0;
if (p->pDataFile == NULL) {
return TSDB_CODE_INVALID_PARA;
}
code = taosFtruncateFile(p->pDataFile, size);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to truncate file since %s", tstrerror(code));
}
return code;
}
int32_t tableBuilderPut(STableBuilder *p, int64_t *seq, uint8_t *value, int32_t len) {
int32_t code = 0;
int32_t lino = 0;
SBlockItemInfo info = {.size = len, .seq = *seq};
tableBuildUpdateTableRange(p, &info);
// seqlen + valuelen + value
int32_t extra = sizeof(*seq) + len + sizeof(len);
if (blockEsimateSize(p->pBlockWrapper.data, extra) >= tableBuilderGetBlockSize(p)) {
code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE);
TSDB_CHECK_CODE(code, lino, _error);
}
code = blockPut(p->pBlockWrapper.data, *seq, value, len);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to put value by seq %" PRId64 " at line %d since %s", *seq, lino, tstrerror(code));
}
return code;
}
int32_t compareFunc(const void *pLeft, const void *pRight) {
SBlkHandle *p1 = (SBlkHandle *)pLeft;
SBlkHandle *p2 = (SBlkHandle *)pRight;
if (p1->range.sseq > p2->range.sseq) {
return 1;
} else if (p1->range.sseq < p2->range.sseq) {
return -1;
}
return 0;
}
int32_t findTargetBlock(SArray *pMetaHandle, int64_t seq) {
SBlkHandle handle = {.range = {.sseq = seq, .eseq = seq}};
return taosArraySearchIdx(pMetaHandle, &handle, compareFunc, TD_LE);
}
int32_t tableBuilderGet(STableBuilder *p, int64_t seq, uint8_t **value, int32_t *len) {
if (p == NULL) {
return TSDB_CODE_NOT_FOUND;
}
SBlkHandle *pHandle = NULL;
if (taosArrayGetSize(p->pMetaHandle) > 0) {
pHandle = taosArrayGetLast(p->pMetaHandle);
if (seqRangeIsGreater(&pHandle->range, seq)) {
return blockSeek(p->pBlockWrapper.data, seq, value, len);
} else {
int32_t idx = findTargetBlock(p->pMetaHandle, seq);
if (idx < 0) {
return TSDB_CODE_NOT_FOUND;
}
pHandle = taosArrayGet(p->pMetaHandle, idx);
return tableBuilderSeek(p, pHandle, seq, value, len);
}
} else {
return blockSeek(p->pBlockWrapper.data, seq, value, len);
}
return TSDB_CODE_NOT_FOUND;
}
static void updateTableRange(SBTableMeta *pTableMeta, SArray *pMetaBlock) {
if (pMetaBlock == NULL) {
return;
}
for (int32_t i = 0; i < taosArrayGetSize(pMetaBlock); i++) {
SMetaBlock *pMeta = taosArrayGet(pMetaBlock, i);
seqRangeUpdate(&pTableMeta->range, &pMeta->range);
}
}
int32_t tableBuilderCommit(STableBuilder *p, SBseLiveFileInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
STableCommitInfo commitInfo = {0};
SArray *pMetaBlock = NULL;
if (p == NULL) {
return TSDB_CODE_INVALID_PARA;
}
code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE);
TSDB_CHECK_CODE(code, lino, _error);
code = taosFsyncFile(p->pDataFile);
TSDB_CHECK_CODE(code, lino, _error);
code = tableBuilderGetMetaBlock(p, &pMetaBlock);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaCommit(p->pTableMeta, pMetaBlock);
TSDB_CHECK_CODE(code, lino, _error);
updateTableRange(p->pTableMeta, pMetaBlock);
pInfo->level = 0;
pInfo->range = p->pTableMeta->range;
pInfo->retentionTs = p->retentionTs;
pInfo->size = p->offset;
_error:
if (code != 0) {
bseError("failed to commit table builder at line %d since %s ", lino, tstrerror(code));
} else {
bseInfo("succ to commit table %s", p->name);
}
taosArrayDestroy(pMetaBlock);
return code;
}
int32_t tableBuilderGetBlockSize(STableBuilder *p) { return p->blockCap; }
void tableBuilderClose(STableBuilder *p, int8_t commited) {
if (p == NULL) {
return;
}
int32_t code = 0;
blockWrapperCleanup(&p->pBlockWrapper);
taosCloseFile(&p->pDataFile);
taosArrayDestroy(p->pMetaHandle);
taosMemoryFree(p);
}
static void addSnapshotMetaToBlock(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
int32_t keepDays) {
SBseSnapMeta *pSnapMeta = pBlkWrapper->data;
pSnapMeta->range = range;
pSnapMeta->fileType = fileType;
pSnapMeta->blockType = blockType;
pSnapMeta->keepDays = keepDays;
return;
}
static void updateSnapshotMeta(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
int32_t keepDays) {
SBseSnapMeta *pSnapMeta = (SBseSnapMeta *)pBlkWrapper->data;
pSnapMeta->keepDays = keepDays;
return;
}
int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
int32_t code = 0;
int32_t lino = 0;
code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
TSDB_CHECK_CODE(code, lino, _error);
code = tableLoadRawBlock(p->pDataFile, pHandle, blkWrapper, 1);
TSDB_CHECK_CODE(code, lino, _error);
addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_SNAP, BSE_TABLE_DATA_TYPE, 365);
_error:
if (code != 0) {
bseError("table reader failed to load block at line %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
int32_t code = 0;
int32_t lino = 0;
SBtableMetaReader *pReader = p->pMetaReader;
code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
TSDB_CHECK_CODE(code, lino, _error);
code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
TSDB_CHECK_CODE(code, lino, _error);
addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_TYPE, 365);
_error:
if (code != 0) {
bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper) {
int32_t code = 0;
int32_t lino = 0;
SBtableMetaReader *pReader = p->pMetaReader;
SBlkHandle *pHandle = p->pMetaReader->footer.metaHandle;
code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
TSDB_CHECK_CODE(code, lino, _error);
code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
TSDB_CHECK_CODE(code, lino, _error);
addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_INDEX_TYPE, 365);
_error:
if (code != 0) {
bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper) {
int32_t code = 0;
int32_t lino = 0;
char buf[kEncodeLen] = {0};
SBtableMetaReader *pReader = p->pMetaReader;
code = footerEncode(&pReader->footer, buf);
int32_t len = sizeof(buf);
int64_t n = taosLSeekFile(pReader->pFile, -kEncodeLen, SEEK_END);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
if (taosReadFile(pReader->pFile, buf, sizeof(buf)) != len) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
code = blockWrapperResize(blkWrapper, len + sizeof(SBseSnapMeta));
TSDB_CHECK_CODE(code, lino, _error);
memcpy((uint8_t *)blkWrapper->data + sizeof(SBseSnapMeta), buf, sizeof(buf));
blkWrapper->size = len + sizeof(SBseSnapMeta);
addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_FOOTER_TYPE, 365);
_error:
if (code != 0) {
bseError("failed to load raw footer from table pReaderMgt at lino %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableReaderOpen(int64_t retentionTs, STableReader **pReader, void *pReaderMgt) {
char data[TSDB_FILENAME_LEN] = {0};
char meta[TSDB_FILENAME_LEN] = {0};
char dataPath[TSDB_FILENAME_LEN] = {0};
int32_t code = 0;
int32_t lino = 0;
int64_t size = 0;
STableReaderMgt *pMgt = (STableReaderMgt *)pReaderMgt;
if (pMgt == NULL) {
}
SSubTableMgt *pMeta = pMgt->pMgt;
STableReader *p = taosMemCalloc(1, sizeof(STableReader));
if (p == NULL) {
TSDB_CHECK_CODE(terrno, lino, _error);
}
p->blockCap = 1024;
p->pReaderMgt = pReaderMgt;
bseBuildDataName(retentionTs, data);
memcpy(p->name, data, strlen(data));
bseBuildFullName(pMgt->pBse, data, dataPath);
code = tableOpenFile(dataPath, 1, &p->pDataFile, &p->fileSize);
TSDB_CHECK_CODE(code, lino, _error);
code = blockWrapperInit(&p->blockWrapper, 1024);
TSDB_CHECK_CODE(code, lino, _error);
bseBuildMetaName(retentionTs, meta);
code = tableMetaReaderInit(pMeta->pTableMetaMgt->pTableMeta, meta, &p->pMetaReader);
TSDB_CHECK_CODE(code, lino, _error);
*pReader = p;
_error:
if (code != 0) {
tableReaderClose(p);
bseError("failed to open table pReaderMgt at line %d since %s", lino, tstrerror(code));
}
return code;
}
void tableReaderShouldPutToCache(STableReader *p, int8_t cache) { p->putInCache = cache; }
int32_t tableReaderGet(STableReader *p, int64_t seq, uint8_t **pValue, int32_t *len) {
int32_t lino = 0;
int32_t code = 0;
SMetaBlock block = {0};
STableReaderMgt *pMgt = (STableReaderMgt *)p->pReaderMgt;
SBtableMetaReader *pMeta = p->pMetaReader;
code = tableMetaReaderLoadBlockMeta(pMeta, seq, &block);
TSDB_CHECK_CODE(code, lino, _error);
SBlockWrapper wrapper = {0};
SBlkHandle blkhandle = {.offset = block.offset, .size = block.size, .range = block.range};
SCacheItem *pItem = NULL;
code = blockCacheGet(pMgt->pBlockCache, &blkhandle.range, (void **)&pItem);
if (code != 0) {
blockWrapperInit(&wrapper, block.size + 16);
bseDebug("block size:%" PRId64 ", offset:%" PRId64 ", [sseq:%" PRId64 ", eseq:%" PRId64 "]", block.size,
block.offset, block.range.sseq, block.range.eseq);
code = tableLoadBlock(p->pDataFile, &blkhandle, &wrapper);
if (code != 0) {
blockWrapperCleanup(&wrapper);
TSDB_CHECK_CODE(code, lino, _error);
}
SBlock *pBlock = wrapper.data;
code = blockCachePut(pMgt->pBlockCache, &block.range, pBlock);
} else {
wrapper.data = pItem->pItem;
wrapper.pCachItem = pItem;
}
code = blockSeek(wrapper.data, seq, pValue, len);
TSDB_CHECK_CODE(code, lino, _error);
if (wrapper.pCachItem != NULL) {
bseCacheUnrefItem(wrapper.pCachItem);
}
_error:
if (code != 0) {
bseError("failed to get table reader data at line %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableReaderGetMeta(STableReader *p, SArray **pMeta) {
int32_t code = 0;
int32_t lino = 0;
SArray *pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
if (pMetaHandle == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
code = tableMetaReaderLoadAllDataHandle(p->pMetaReader, pMetaHandle);
TSDB_CHECK_CODE(code, lino, _error);
*pMeta = pMetaHandle;
_error:
if (code != 0) {
bseError("failed to get table reader meta at lino %d since %s", lino, tstrerror(code));
}
return code;
}
void tableReaderClose(STableReader *p) {
if (p == NULL) return;
int32_t code = 0;
taosArrayDestroy(p->pMetaHandle);
taosCloseFile(&p->pDataFile);
tableMetaReaderClose(p->pMetaReader);
blockWrapperCleanup(&p->blockWrapper);
taosMemoryFree(p);
}
int32_t blockCreate(int32_t cap, SBlock **p) {
int32_t code = 0;
SBlock *t = taosMemCalloc(1, cap);
if (t == NULL) {
return terrno;
}
*p = t;
return code;
}
int32_t blockEsimateSize(SBlock *p, int32_t extra) {
// block len + TSCHSUM + len + type;
return BLOCK_TOTAL_SIZE(p) + extra;
}
int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len) {
int32_t code = 0;
int32_t offset = 0;
uint8_t *data = (uint8_t *)p->data + p->len;
memcpy(data, value, len);
p->len += len;
return len;
}
int32_t blockPut(SBlock *p, int64_t seq, uint8_t *value, int32_t len) {
int32_t code = 0;
uint8_t *data = (uint8_t *)p->data + p->len;
int32_t offset = taosEncodeVariantI64((void **)&data, seq);
offset += taosEncodeVariantI32((void **)&data, len);
offset += taosEncodeBinary((void **)&data, value, len);
p->len += len;
return offset;
}
void blockClear(SBlock *p) {
p->len = 0;
p->type = 0;
p->data[0] = 0;
}
int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len) {
int8_t found = 0;
int32_t code = 0;
int32_t offset = 0;
uint8_t *p1 = (uint8_t *)p->data;
uint8_t *p2 = p1;
while (p2 - p1 < p->len) {
int64_t k;
int32_t v;
p2 = taosDecodeVariantI64(p2, &k);
p2 = taosDecodeVariantI32(p2, &v);
if (seq == k) {
*pValue = taosMemCalloc(1, v);
memcpy(*pValue, p2, v);
*len = v;
found = 1;
break;
}
p2 += v;
}
if (found == 0) {
code = TSDB_CODE_NOT_FOUND;
}
return code;
}
int8_t blockGetType(SBlock *p) { return p->type; }
void blockDestroy(SBlock *pBlock) { taosMemoryFree(pBlock); }
int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo) {
int32_t code = 0;
uint8_t *data = (uint8_t *)p->data + p->len;
int32_t offset = blkHandleEncode(pInfo, (char *)data);
p->len += offset;
return offset;
}
int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf) {
char *p = buf;
int32_t tlen = 0;
tlen += taosEncodeVariantU64((void **)&p, pHandle->offset);
tlen += taosEncodeVariantU64((void **)&p, pHandle->size);
tlen += taosEncodeVariantI64((void **)&p, pHandle->range.sseq);
tlen += taosEncodeVariantI64((void **)&p, pHandle->range.eseq);
return tlen;
}
int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf) {
char *p = buf;
p = taosDecodeVariantU64(p, &pHandle->offset);
p = taosDecodeVariantU64(p, &pHandle->size);
p = taosDecodeVariantI64(p, &pHandle->range.sseq);
p = taosDecodeVariantI64(p, &pHandle->range.eseq);
return p - buf;
}
// | meta handle | index handle | padding | magic number high | magic number low |
int32_t footerEncode(STableFooter *pFooter, char *buf) {
char *p = buf;
int32_t len = 0;
len += blkHandleEncode(pFooter->metaHandle, p + len);
len += blkHandleEncode(pFooter->indexHandle, p + len);
p = buf + kEncodeLen - 8;
taosEncodeFixedU32((void **)&p, kMagicNum);
taosEncodeFixedU32((void **)&p, kMagicNum);
return 0;
}
int32_t footerDecode(STableFooter *pFooter, char *buf) {
int32_t code = 0;
char *p = buf;
char *mp = buf + kEncodeLen - 8;
uint32_t ml, mh;
taosDecodeFixedU32(mp, &ml);
taosDecodeFixedU32(mp + 4, &mh);
if (ml != kMagicNum || mh != kMagicNum) {
return TSDB_CODE_FILE_CORRUPTED;
}
int32_t len = blkHandleDecode(pFooter->metaHandle, buf);
if (len < 0) {
return TSDB_CODE_FILE_CORRUPTED;
}
len = blkHandleDecode(pFooter->indexHandle, buf + len);
if (len < 0) {
return TSDB_CODE_FILE_CORRUPTED;
}
return code;
}
int32_t blockSeekMeta(SBlock *pBlock, int64_t seq, SMetaBlock *pMeta) {
int32_t code = 0;
int32_t len = 0;
uint8_t *p = (uint8_t *)pBlock->data;
while (len < pBlock->len) {
SMetaBlock meta = {0};
int32_t offset = metaBlockDecode(&meta, (char *)p);
if (seqRangeContains(&meta.range, seq)) {
memcpy(pMeta, &meta, sizeof(SMetaBlock));
return 0;
}
len += offset;
p += offset;
}
return TSDB_CODE_NOT_FOUND;
}
int32_t blockGetAllMeta(SBlock *pBlock, SArray *pMeta) {
int32_t code = 0;
int32_t len = 0;
uint8_t *p = (uint8_t *)pBlock->data;
while (len < pBlock->len) {
SMetaBlock meta = {0};
int32_t offset = metaBlockDecode(&meta, (char *)p);
if (taosArrayPush(pMeta, &meta) == NULL) {
return terrno;
}
len += offset;
p += offset;
}
return code;
}
int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf) {
char *p = buf;
int32_t len = 0;
len += taosEncodeFixedI8((void **)&p, pMeta->type);
len += taosEncodeFixedI8((void **)&p, pMeta->version);
len += taosEncodeFixedI16((void **)&p, pMeta->reserve);
len += taosEncodeVariantI64((void **)&p, pMeta->offset);
len += taosEncodeVariantI64((void **)&p, pMeta->size);
len += taosEncodeVariantI64((void **)&p, pMeta->range.sseq);
len += taosEncodeVariantI64((void **)&p, pMeta->range.eseq);
return len;
}
int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf) {
char *p = buf;
int32_t len = 0;
p = taosDecodeFixedI8(p, &pMeta->type);
p = taosDecodeFixedI8(p, &pMeta->version);
p = taosDecodeFixedI16(p, &pMeta->reserve);
p = taosDecodeVariantI64(p, &pMeta->offset);
p = taosDecodeVariantI64(p, &pMeta->size);
p = taosDecodeVariantI64(p, &pMeta->range.sseq);
p = taosDecodeVariantI64(p, &pMeta->range.eseq);
return p - buf;
}
int32_t metaBlockAdd(SBlock *p, SMetaBlock *pBlk) {
int32_t code = 0;
uint8_t *data = (uint8_t *)p->data + p->len;
int32_t offset = metaBlockEncode(pBlk, (char *)data);
p->len += offset;
return offset;
}
int32_t metaBlockGet(SBlock *p, SMetaBlock *pBlk) {
int32_t code = 0;
uint8_t *data = (uint8_t *)p->data + p->len;
int32_t offset = metaBlockDecode(pBlk, (char *)data);
p->len += offset;
return offset;
}
int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int32_t *nWrite) {
int32_t code = 0;
int32_t lino = 0;
SBlock *pBlk = pBlkW->data;
if (pBlk->len == 0) {
return 0;
}
int8_t compressType = kNoCompres;
SBlockWrapper wrapper = {0};
uint8_t *pWrite = (uint8_t *)pBlk;
int32_t len = BLOCK_TOTAL_SIZE(pBlk);
BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
if (compressType != kNoCompres) {
code = blockWrapperInit(&wrapper, len + 4);
TSDB_CHECK_CODE(code, lino, _error);
int32_t compressSize = wrapper.cap;
code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
if (code != 0) {
bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
blockWrapperCleanup(&wrapper);
BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
} else {
int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
len = compressSize + BLOCK_TAIL_LEN;
pWrite = (uint8_t *)wrapper.data;
}
}
code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
TSDB_CHECK_CODE(code, lino, _error);
int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
int32_t nwrite = taosWriteFile(pFile, (uint8_t *)pWrite, len);
if (nwrite != len) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
*nWrite = nwrite;
blockWrapperCleanup(&wrapper);
_error:
if (code != 0) {
bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
} else {
bseDebug("flush at offset %" PRId64 ", size %d", pHandle->offset, len);
}
return code;
}
int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW) {
int32_t code = 0;
int32_t lino = 0;
code = blockWrapperResize(pBlkW, pHandle->size + 16);
TSDB_CHECK_CODE(code, lino, _error);
SBlock *pBlk = pBlkW->data;
uint8_t *pRead = (uint8_t *)pBlk;
SBlockWrapper pHelp = {0};
int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
if (nr != pHandle->size) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
uint8_t compressType = 0;
int32_t rawSize = 0;
COMPRESS_DATA_GET_TYPE_AND_RAWLEN(pRead, pHandle->size, compressType, rawSize);
if (compressType != kNoCompres) {
code = blockWrapperInit(&pHelp, rawSize);
TSDB_CHECK_CODE(code, lino, _error);
int32_t unCompressSize = pHelp.cap;
code = bseDecompressData(compressType, pRead, pHandle->size - BLOCK_TAIL_LEN, pHelp.data, &unCompressSize);
if (code != 0) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
SBlock *p = pHelp.data;
if (BLOCK_ROW_SIZE_OFFSET(p) != unCompressSize) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
blockWrapperCleanup(pBlkW);
blockWrapperTransfer(pBlkW, &pHelp);
} else {
if (pBlk->len != (pHandle->size - BLOCK_TAIL_LEN - sizeof(SBlock))) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
}
_error:
if (code != 0) {
bseError("failed to load block at lino %d since %s, read at offset %" PRId64 ", size:%" PRId64 "", lino,
tstrerror(code), pHandle->offset, pHandle->size);
} else {
bseDebug("read at offset %" PRId64 ", size %" PRId64 "", pHandle->offset, pHandle->size);
}
blockWrapperCleanup(&pHelp);
return code;
}
int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int8_t checkSum) {
int32_t code = 0;
int32_t lino = 0;
SBlock *pBlk = pBlkW->data;
uint8_t *pRead = (uint8_t *)pBlk + sizeof(SBseSnapMeta);
int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
if (nr != pHandle->size) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
if (checkSum) {
if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
}
pBlkW->size = pHandle->size + sizeof(SBseSnapMeta);
_error:
if (code != 0) {
bseError("failed to load block at lino %d since %s", lino, tstrerror(code));
}
return code;
}
int8_t seqRangeContains(SSeqRange *p, int64_t seq) { return seq >= p->sseq && seq <= p->eseq; }
void seqRangeReset(SSeqRange *p) {
p->sseq = -1;
p->eseq = -1;
}
int8_t seqRangeIsGreater(SSeqRange *p, int64_t seq) { return seq > p->eseq; }
void seqRangeUpdate(SSeqRange *dst, SSeqRange *src) {
if (dst->sseq == -1) {
dst->sseq = src->sseq;
}
dst->eseq = src->eseq;
}
int32_t blockWrapperInit(SBlockWrapper *p, int32_t cap) {
p->data = taosMemoryCalloc(1, cap);
if (p->data == NULL) {
return terrno;
}
p->cap = cap;
return 0;
}
void blockWrapperCleanup(SBlockWrapper *p) {
if (p->data != NULL) {
taosMemoryFree(p->data);
p->data = NULL;
}
p->cap = 0;
}
void blockWrapperTransfer(SBlockWrapper *dst, SBlockWrapper *src) {
if (dst == NULL || src == NULL) {
return;
}
dst->data = src->data;
dst->cap = src->cap;
src->data = NULL;
src->cap = 0;
}
int32_t blockWrapperResize(SBlockWrapper *p, int32_t newCap) {
if (p->cap < newCap) {
int32_t cap = p->cap;
if (cap == 0) cap = 1024;
while (cap < newCap) {
cap = cap * 2;
}
void *data = taosMemoryRealloc(p->data, cap);
if (data == NULL) {
return terrno;
}
p->data = data;
p->cap = cap;
}
return 0;
}
void blockWrapperClear(SBlockWrapper *p) {
SBlock *block = (SBlock *)p->data;
blockClear(block);
}
void blockWrapperSetType(SBlockWrapper *p, int8_t type) {
SBlock *block = (SBlock *)p->data;
block->type = type;
}
int32_t tableReaderIterInit(int64_t retention, int8_t type, STableReaderIter **ppIter, SBse *pBse) {
int32_t code = 0;
int32_t lino = 0;
STableMgt *pTableMgt = pBse->pTableMgt;
STableReaderIter *p = taosMemCalloc(1, sizeof(STableReaderIter));
if (p == NULL) {
return terrno;
}
p->retentionTs = retention;
SSubTableMgt *retentionMgt = NULL;
code = createSubTableMgt(retention, 1, pBse->pTableMgt, &retentionMgt);
TSDB_CHECK_CODE(code, lino, _error);
p->pSubMgt = retentionMgt;
code = tableReaderOpen(retention, &p->pTableReader, retentionMgt->pReaderMgt);
TSDB_CHECK_CODE(code, lino, _error);
tableReaderShouldPutToCache(p->pTableReader, 0);
p->blockIndex = 0;
p->blockType = type;
if (p->blockType == BSE_TABLE_DATA_TYPE) {
code = tableReaderGetMeta(p->pTableReader, &p->pMetaHandle);
TSDB_CHECK_CODE(code, lino, _error);
} else if (p->blockType == BSE_TABLE_META_TYPE) {
p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
if (p->pMetaHandle == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
code = tableMetaReaderLoadMetaHandle(p->pTableReader->pMetaReader, p->pMetaHandle);
} else {
p->isOver = 1;
}
*ppIter = p;
_error:
if (code != 0) {
bseError("failed to init table reader iter since %s", tstrerror(code));
tableReaderIterDestroy(p);
}
return code;
}
int32_t tableReaderIterNext(STableReaderIter *pIter, uint8_t **pValue, int32_t *len) {
int32_t code = 0;
int32_t lino = 0;
SBseSnapMeta snapMeta = {0};
snapMeta.range.sseq = -1;
snapMeta.range.eseq = -1;
snapMeta.keepDays = pIter->retentionTs;
snapMeta.fileType = pIter->fileType;
snapMeta.blockType = pIter->blockType;
if (pIter->blockType == BSE_TABLE_DATA_TYPE) {
SBlkHandle *pHandle = NULL;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
taosArrayDestroy(pIter->pMetaHandle);
pIter->pMetaHandle = NULL;
pIter->blockIndex = 0;
pIter->isOver = 1;
return 0;
} else {
pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
", %" PRId64 "]",
pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
pHandle->range.sseq, pHandle->range.eseq);
code = tableReaderLoadRawBlock(pIter->pTableReader, pHandle, &pIter->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
pIter->blockIndex++;
}
} else if (pIter->blockType == BSE_TABLE_META_TYPE) {
SBlkHandle *pHandle = NULL;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
taosArrayDestroy(pIter->pMetaHandle);
pIter->pMetaHandle = NULL;
pIter->blockIndex = 0;
pIter->blockType = BSE_TABLE_META_INDEX_TYPE;
} else {
pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
", %" PRId64 "]",
pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
pHandle->range.sseq, pHandle->range.eseq);
code = tableReaderLoadRawMeta(pIter->pTableReader, pHandle, &pIter->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
pIter->blockIndex++;
}
}
if (pIter->blockType == BSE_TABLE_META_INDEX_TYPE) {
code = tableReaderLoadRawMetaIndex(pIter->pTableReader, &pIter->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
pIter->blockType = BSE_TABLE_FOOTER_TYPE;
} else if (pIter->blockType == BSE_TABLE_FOOTER_TYPE) {
code = tableReaderLoadRawFooter(pIter->pTableReader, &pIter->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
pIter->blockType = BSE_TABLE_END_TYPE;
} else if (pIter->blockType == BSE_TABLE_END_TYPE) {
pIter->isOver = 1;
return code;
}
SSeqRange range = {0};
if (pIter->blockWrapper.data != NULL) {
updateSnapshotMeta(&pIter->blockWrapper, range, pIter->fileType, pIter->blockType, snapMeta.keepDays);
*pValue = pIter->blockWrapper.data;
*len = pIter->blockWrapper.size;
}
_error:
if (code != 0) {
bseError("failed to load block since %s", tstrerror(code));
pIter->isOver = 1;
}
return code;
}
int8_t tableReaderIterValid(STableReaderIter *pIter) { return pIter->isOver == 0; }
int32_t bseReadCurrentSnap(SBse *pBse, uint8_t **pValue, int32_t *len) {
int32_t code = 0;
char path[128] = {0};
int32_t lino = 0;
TdFilePtr fd = NULL;
int64_t sz = 0;
char name[TSDB_FILENAME_LEN] = {0};
uint8_t *pCurrent = NULL;
bseBuildCurrentName(pBse, name);
if (taosCheckExistFile(name) == 0) {
bseInfo("vgId:%d, no current meta file found, skip recover", pBse->cfg.vgId);
return 0;
}
code = taosStatFile(name, &sz, NULL, NULL);
TSDB_CHECK_CODE(code, lino, _error);
fd = taosOpenFile(name, TD_FILE_READ);
if (fd == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
pCurrent = (uint8_t *)taosMemoryCalloc(1, sizeof(SBseSnapMeta) + sz);
if (pCurrent == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
int64_t nread = taosReadFile(fd, pCurrent + sizeof(SBseSnapMeta), sz);
if (nread != sz) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
taosCloseFile(&fd);
SBseSnapMeta *pMeta = (SBseSnapMeta *)(pCurrent);
pMeta->fileType = BSE_CURRENT_SNAP;
*pValue = pCurrent;
*len = sz + sizeof(SBseSnapMeta);
_error:
if (code != 0) {
bseError("vgId:%d, failed to read current at line %d since %s", pBse->cfg.vgId, lino, tstrerror(code));
taosCloseFile(&fd);
taosMemoryFree(pCurrent);
}
return code;
}
void tableReaderIterDestroy(STableReaderIter *pIter) {
if (pIter == NULL) return;
taosArrayDestroy(pIter->pMetaHandle);
tableReaderClose(pIter->pTableReader);
blockWrapperCleanup(&pIter->blockWrapper);
destroySubTableMgt(pIter->pSubMgt);
taosMemoryFree(pIter);
}
int32_t blockWithMetaInit(SBlock *pBlock, SBlockWithMeta **pMeta) {
int32_t code = 0;
int32_t lino = 0;
SBlockWithMeta *p = taosMemCalloc(1, sizeof(SBlockWithMeta));
if (p == NULL) {
return terrno;
}
p->pBlock = pBlock;
p->pMeta = taosArrayInit(8, sizeof(SBlockIndexMeta));
if (p->pMeta == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
uint8_t *p1 = (uint8_t *)pBlock->data;
uint8_t *p2 = (uint8_t *)p1;
while (p2 - p1 < pBlock->len) {
int64_t k;
int32_t vlen = 0;
SBlockIndexMeta meta = {0};
int32_t offset = 0;
p2 = taosDecodeVariantI64((void **)p2, &k);
offset = p2 - p1;
p2 = taosDecodeVariantI32((void **)p2, &vlen);
meta.seq = k;
meta.offset = offset;
if (taosArrayPush(p->pMeta, &meta) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
p2 += vlen;
}
*pMeta = p;
_error:
if (code != 0) {
bseError("failed to init block with meta since %s", tstrerror(code));
blockWithMetaCleanup(p);
}
return code;
}
int32_t blockWithMetaCleanup(SBlockWithMeta *p) {
if (p == NULL) return 0;
taosArrayDestroy(p->pMeta);
taosMemoryFree(p);
return 0;
}
int comprareFunc(const void *pLeft, const void *pRight) {
SBlockIndexMeta *p1 = (SBlockIndexMeta *)pLeft;
SBlockIndexMeta *p2 = (SBlockIndexMeta *)pRight;
if (p1->seq > p2->seq) {
return 1;
} else if (p1->seq < p2->seq) {
return -1;
}
return 0;
}
int32_t blockWithMetaSeek(SBlockWithMeta *p, int64_t seq, uint8_t **pValue, int32_t *len) {
int32_t code = 0;
SBlockIndexMeta key = {.seq = seq, .offset = 0};
int32_t idx = taosArraySearchIdx(p->pMeta, &seq, comprareFunc, TD_EQ);
if (idx < 0) {
return TSDB_CODE_NOT_FOUND;
}
SBlockIndexMeta *pMeta = taosArrayGet(p->pMeta, idx);
if (pMeta == NULL) {
return TSDB_CODE_NOT_FOUND;
}
uint8_t *data = (uint8_t *)p->pBlock->data + pMeta->offset;
data = taosDecodeVariantI32((void *)data, len);
if (*len <= 0) {
return TSDB_CODE_NOT_FOUND;
}
*pValue = taosMemCalloc(1, *len);
if (*pValue == NULL) {
return terrno;
}
memcpy(*pValue, data, *len);
return code;
}
int32_t tableMetaOpen(char *name, SBTableMeta **pMeta, void *pMetaMgt) {
int32_t code = 0;
int32_t lino = 0;
SBTableMeta *p = taosMemCalloc(1, sizeof(SBTableMeta));
if (p == NULL) {
TSDB_CHECK_CODE(code, lino, _error);
}
if (name != NULL) {
memcpy(p->name, name, strlen(name) + 1);
}
p->pBse = ((STableMetaMgt *)pMetaMgt)->pBse;
p->blockCap = BSE_GET_BLOCK_SIZE((SBse *)p->pBse);
*pMeta = p;
_error:
if (code != 0) {
bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
tableMetaClose(p);
}
return code;
}
int32_t tableMetaCommit(SBTableMeta *pMeta, SArray *pBlock) {
int32_t code = 0;
int32_t lino = 0;
SBtableMetaWriter *pWriter = NULL;
SBtableMetaReader *pReader = NULL;
SBtableMetaReaderIter *pIter = NULL;
char tempMetaName[TSDB_FILENAME_LEN] = {0};
char metaName[TSDB_FILENAME_LEN] = {0};
char tempMetaPath[TSDB_FILENAME_LEN] = {0};
char metaPath[TSDB_FILENAME_LEN] = {0};
bseBuildTempMetaName(pMeta->retentionTs, tempMetaName);
bseBuildMetaName(pMeta->retentionTs, metaName);
code = tableMetaWriterInit(pMeta, tempMetaName, &pWriter);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaReaderInit(pMeta, metaName, &pReader);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaReaderOpenIter(pReader, &pIter);
TSDB_CHECK_CODE(code, lino, _error);
while (!pIter->isOver) {
SBlkHandle blkHandle = {0};
SBlockWrapper wrapper;
code = tableMetaReaderIterNext(pIter, &wrapper, &blkHandle);
TSDB_CHECK_CODE(code, lino, _error);
if (pIter->isOver) {
break;
}
blockWrapperSetType(&wrapper, BSE_TABLE_META_TYPE);
code = tableMetaWriteAppendRawBlock(pWriter, &wrapper, &blkHandle);
TSDB_CHECK_CODE(code, lino, _error);
seqRangeUpdate(&pMeta->range, &blkHandle.range);
}
code = tableMetaWriterAppendBlock(pWriter, pBlock);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaWriterCommit(pWriter);
TSDB_CHECK_CODE(code, lino, _error);
tableMetaWriterClose(pWriter);
tableMetaReaderClose(pReader);
pWriter = NULL;
pReader = NULL;
bseBuildFullName(pMeta->pBse, tempMetaName, tempMetaPath);
bseBuildFullName(pMeta->pBse, metaName, metaPath);
code = taosRenameFile(tempMetaPath, metaPath);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
}
tableMetaReaderIterClose(pIter);
tableMetaWriterClose(pWriter);
tableMetaReaderClose(pReader);
return code;
}
int32_t tableMetaWriterAppendBlock(SBtableMetaWriter *pMeta, SArray *pBlock) {
int32_t code = 0;
if (taosArrayAddAll(pMeta->pBlock, pBlock) == NULL) {
return terrno;
}
return code;
}
int32_t tableMetaWriterFlushBlock(SBtableMetaWriter *pMeta) {
int32_t code = 0;
int32_t lino = 0;
SSeqRange range = {.sseq = -1, .eseq = -1};
int64_t offset = 0;
int32_t nWrite = 0;
int32_t size = pMeta->blockCap;
blockWrapperClear(&pMeta->blockWrapper);
code = blockWrapperResize(&pMeta->blockWrapper, size);
TSDB_CHECK_CODE(code, lino, _error);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlock); i++) {
SMetaBlock *pBlk = taosArrayGet(pMeta->pBlock, i);
if (blockEsimateSize(pMeta->blockWrapper.data, sizeof(SMetaBlock)) >= pMeta->blockCap) {
SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
TSDB_CHECK_CODE(code, lino, _error);
pMeta->offset += nWrite;
handle.size = nWrite;
blockWrapperClear(&pMeta->blockWrapper);
code = blockWrapperResize(&pMeta->blockWrapper, size);
TSDB_CHECK_CODE(code, lino, _error);
if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
range.sseq = -1;
offset = 0;
}
offset += metaBlockAdd(pMeta->blockWrapper.data, pBlk);
if (range.sseq == -1) {
range.sseq = pBlk->range.sseq;
}
range.eseq = pBlk->range.eseq;
}
if (offset == 0) {
return 0;
}
blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
TSDB_CHECK_CODE(code, lino, _error);
pMeta->offset += nWrite;
handle.size = nWrite;
if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
_error:
if (code != 0) {
bseError("failed to flush table meta %s at line %d since %s", pMeta->name, 0, tstrerror(code));
tableMetaWriterClose(pMeta);
}
return code;
}
int32_t tableMetaWriterFlushIndex(SBtableMetaWriter *pMeta) {
int32_t code = 0;
int32_t lino = 0;
int32_t nWrite = 0;
int64_t lastOffset = pMeta->offset;
int32_t blkHandleSize = 0;
int32_t extra = 8;
int32_t size = taosArrayGetSize(pMeta->pBlkHandle) * sizeof(SBlkHandle);
SSeqRange range = {-1, -1};
blockWrapperClear(&pMeta->blockWrapper);
code = blockWrapperResize(&pMeta->blockWrapper, size + extra);
TSDB_CHECK_CODE(code, lino, _error);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlkHandle); i++) {
SBlkHandle *pHandle = taosArrayGet(pMeta->pBlkHandle, i);
if (pHandle == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
blkHandleSize += metaBlockAddIndex(pMeta->blockWrapper.data, pHandle);
seqRangeUpdate(&range, &pHandle->range);
}
blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_INDEX_TYPE);
SBlkHandle handle = {.offset = lastOffset, .size = blkHandleSize, .range = range};
code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
TSDB_CHECK_CODE(code, lino, _error);
SBlkHandle metaHandle = {.offset = pMeta->offset, .size = nWrite, .range = range};
SBlkHandle indexHandle = {.offset = pMeta->offset + nWrite, .size = 0, .range = range};
pMeta->offset += nWrite;
memcpy(pMeta->footer.metaHandle, &metaHandle, sizeof(SBlkHandle));
memcpy(pMeta->footer.indexHandle, &metaHandle, sizeof(SBlkHandle));
_error:
if (code != 0) {
bseError("failed to build table meta index at line %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableMetaWriterFlushFooter(SBtableMetaWriter *p) {
char buf[kEncodeLen] = {0};
int32_t code = 0;
int32_t lino = 0;
code = footerEncode(&p->footer, buf);
TSDB_CHECK_CODE(code, lino, _error);
p->offset += sizeof(buf);
int32_t nwrite = taosWriteFile(p->pFile, buf, sizeof(buf));
if (nwrite != sizeof(buf)) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
_error:
if (code != 0) {
bseError("failed to add footer to table builder at line %d since %s", lino, tstrerror(code));
}
return code;
}
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta) {
int32_t code = 0;
int32_t lino = 0;
code = tableMetaWriterFlushBlock(pMeta);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaWriterFlushIndex(pMeta);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaWriterFlushFooter(pMeta);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
tableMetaWriterClose(pMeta);
}
return code;
}
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle) {
int32_t code = 0;
int32_t lino = 0;
int32_t nwrite = 0;
code = tableFlushBlock(pMeta->pFile, pBlkHandle, pBlock, &nwrite);
TSDB_CHECK_CODE(code, lino, _error);
SBlkHandle handle = {.offset = pMeta->offset, .size = nwrite, .range = pBlkHandle->range};
if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
pMeta->offset += nwrite;
_error:
if (code != 0) {
bseError("failed to append block to table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
tableMetaWriterClose(pMeta);
}
return code;
}
int32_t tableMetaReaderLoadFooter(SBtableMetaReader *pMeta) {
int32_t code = 0;
int32_t lino = 0;
char footer[kEncodeLen] = {0};
if (pMeta->pFile == NULL) {
return 0;
}
int64_t n = taosLSeekFile(pMeta->pFile, -kEncodeLen, SEEK_END);
if (n < 0) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
if (taosReadFile(pMeta->pFile, footer, kEncodeLen) != kEncodeLen) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _error);
}
code = footerDecode(&pMeta->footer, footer);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to load table meta footer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
}
return code;
}
int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size) {
int32_t lino = 0;
int32_t code = 0;
int32_t opt = 0;
TdFilePtr p = NULL;
if (read) {
opt = TD_FILE_READ;
} else {
opt = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_APPEND;
}
if (!taosCheckExistFile(name)) {
if (read) {
return 0;
}
p = taosOpenFile(name, opt);
if (p == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
*pFile = p;
return code;
}
code = taosStatFile(name, size, NULL, NULL);
TSDB_CHECK_CODE(code, lino, _error);
if (*size <= 0) {
TSDB_CHECK_CODE(code = TSDB_CODE_NOT_FOUND, lino, _error);
}
p = taosOpenFile(name, opt);
if (p == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
*pFile = p;
_error:
if (code != 0) {
bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
}
return code;
}
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name) {
int32_t code = 0;
int64_t size = 0;
int32_t lino = 0;
code = tableOpenFile(name, read, &pMeta->pFile, &size);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to open table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
}
return code;
}
int32_t tableMetaReaderLoad(SBtableMetaReader *pMeta) {
int32_t code = 0;
int32_t lino = 0;
code = tableMetaOpenFile(pMeta, 1, pMeta->name);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaReaderLoadFooter(pMeta);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaReaderLoadIndex(pMeta);
TSDB_CHECK_CODE(code, lino, _error);
_error:
if (code != 0) {
bseError("failed to load table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
}
return code;
}
void tableMetaClose(SBTableMeta *p) {
if (p == NULL) return;
taosMemoryFree(p);
}
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter) {
int32_t code = 0;
int32_t lino = 0;
char path[TSDB_FILENAME_LEN] = {0};
bseBuildFullName(pMeta->pBse, name, path);
SBtableMetaWriter *p = taosMemCalloc(1, sizeof(SBtableMetaWriter));
if (p == NULL) {
return terrno;
}
p->pTableMeta = pMeta;
p->blockCap = pMeta->blockCap;
p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
if (p->pBlkHandle == NULL) {
TSDB_CHECK_CODE(code, lino, _error);
}
p->pBlock = taosArrayInit(128, sizeof(SMetaBlock));
if (p->pBlock == NULL) {
TSDB_CHECK_CODE(code, lino, _error);
}
blockWrapperInit(&p->blockWrapper, 1024);
code = tableMetaOpenFile(p, 0, path);
TSDB_CHECK_CODE(code, lino, _error);
*ppWriter = p;
_error:
if (code != 0) {
bseError("failed to init table meta writer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
tableMetaWriterClose(p);
}
return code;
}
void tableMetaWriterClose(SBtableMetaWriter *p) {
if (p == NULL) return;
taosCloseFile(&p->pFile);
taosArrayDestroy(p->pBlkHandle);
taosArrayDestroy(p->pBlock);
blockWrapperCleanup(&p->blockWrapper);
taosMemoryFree(p);
}
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader) {
int32_t code = 0;
int32_t lino = 0;
char path[TSDB_FILENAME_LEN] = {0};
bseBuildFullName(pMeta->pBse, name, path);
SBtableMetaReader *p = taosMemCalloc(1, sizeof(SBtableMetaReader));
if (p == NULL) {
return terrno;
}
memcpy(p->name, path, sizeof(path));
p->pTableMeta = pMeta;
p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
if (p->pBlkHandle == NULL) {
TSDB_CHECK_CODE(code, lino, _error);
}
code = blockWrapperInit(&p->blockWrapper, 1024);
TSDB_CHECK_CODE(code, lino, _error);
code = tableMetaReaderLoad(p);
TSDB_CHECK_CODE(code, lino, _error);
*ppReader = p;
_error:
if (code != 0) {
bseError("failed to init table meta reader %s at line %d since %s", pMeta->name, lino, tstrerror(code));
tableMetaReaderClose(p);
}
return code;
}
void tableMetaReaderClose(SBtableMetaReader *p) {
if (p == NULL) return;
taosCloseFile(&p->pFile);
taosArrayDestroy(p->pBlkHandle);
blockWrapperCleanup(&p->blockWrapper);
taosMemoryFree(p);
}
int32_t tableMetaReaderLoadBlockMeta(SBtableMetaReader *p, int64_t seq, SMetaBlock *pMetaBlock) {
int32_t code = 0;
int32_t lino = 0;
SBtableMetaReader *pMeta = p;
SSeqRange range = {.sseq = seq, .eseq = seq};
SBlkHandle handle = {.range = range};
int32_t index = taosArraySearchIdx(p->pBlkHandle, &handle, compareFunc, TD_LE);
SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, index);
if (pHandle == NULL) {
return TSDB_CODE_NOT_FOUND;
}
code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
code = blockSeekMeta(p->blockWrapper.data, seq, pMetaBlock);
TSDB_CHECK_CODE(code, lino, _error);
_error:
return code;
}
int32_t tableMetaReaderLoadAllDataHandle(SBtableMetaReader *p, SArray *dataHandle) {
int32_t lino = 0;
int32_t code = 0;
SArray *pMeta = taosArrayInit(8, sizeof(SMetaBlock));
for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
if (pHandle == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
TSDB_CHECK_CODE(code, lino, _exit);
code = blockGetAllMeta(p->blockWrapper.data, pMeta);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pMeta) == 0) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
for (int32_t j = 0; j < taosArrayGetSize(pMeta); j++) {
SMetaBlock *pBlk = taosArrayGet(pMeta, j);
SBlkHandle handle = {.offset = pBlk->offset, .size = pBlk->size, .range = pBlk->range};
if (taosArrayPush(dataHandle, &handle) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
}
}
_exit:
taosArrayDestroy(pMeta);
return code;
}
int32_t tableMetaReaderLoadMetaHandle(SBtableMetaReader *p, SArray *pMetaHandle) {
int32_t code = 0;
int32_t lino = 0;
if (taosArrayGetSize(p->pBlkHandle) == 0) {
return TSDB_CODE_NOT_FOUND;
}
for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
if (pHandle == NULL) {
return TSDB_CODE_FILE_CORRUPTED;
}
if (taosArrayPush(pMetaHandle, pHandle) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _error);
}
}
_error:
return code;
}
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p) {
int32_t code = 0;
int32_t lino = 0;
int32_t offset = 0;
if (p->pFile == NULL) {
return 0;
}
SBtableMetaReader *pMeta = p;
p->blockWrapper.type = BSE_TABLE_META_TYPE;
code = tableLoadBlock(pMeta->pFile, pMeta->footer.metaHandle, &p->blockWrapper);
TSDB_CHECK_CODE(code, lino, _error);
if (blockGetType(p->blockWrapper.data) != BSE_TABLE_META_INDEX_TYPE) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
}
SBlock *pBlk = (SBlock *)p->blockWrapper.data;
uint8_t *data = (uint8_t *)pBlk->data;
do {
SBlkHandle handle = {0};
offset += blkHandleDecode(&handle, (char *)data + offset);
if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
TSDB_CHECK_CODE(terrno, lino, _error);
}
} while (offset < pBlk->len);
_error:
if (code != 0) {
bseError("failed to load table meta blk handle %s at line %d since %s", pMeta->name, lino, tstrerror(code));
}
return code;
}
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter) {
int32_t code = 0;
int32_t lino = 0;
SBtableMetaReaderIter *p = taosMemCalloc(1, sizeof(SBtableMetaReaderIter));
if (p == NULL) {
return terrno;
}
p->pReader = pReader;
code = blockWrapperInit(&p->pBlockWrapper, 1024);
if (code != 0) {
return code;
}
*pIter = p;
if (taosArrayGetSize(pReader->pBlkHandle) == 0) {
p->isOver = 1;
return 0;
}
return 0;
}
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle) {
int32_t code = 0;
int32_t lino = 0;
if (pIter->blkIdx >= taosArrayGetSize(pIter->pReader->pBlkHandle)) {
pIter->isOver = 1;
return 0;
}
SBlkHandle *pHandle = taosArrayGet(pIter->pReader->pBlkHandle, pIter->blkIdx);
if (pHandle == NULL) {
return TSDB_CODE_FILE_CORRUPTED;
}
SBlockWrapper *pWrapper = &pIter->pBlockWrapper;
code = blockWrapperResize(pWrapper, pHandle->size);
TSDB_CHECK_CODE(code, lino, _error);
code = tableLoadBlock(pIter->pReader->pFile, pHandle, pWrapper);
TSDB_CHECK_CODE(code, lino, _error);
pIter->blkIdx++;
if (blockGetType(pWrapper->data) != BSE_TABLE_META_TYPE) {
pIter->isOver = 1;
return 0;
}
*pDataWrapper = *pWrapper;
*dstHandle = *pHandle;
_error:
if (code != 0) {
bseError("failed to load table meta blk handle %s at line %d since %s", pIter->pReader->name, lino,
tstrerror(code));
pIter->pReader = NULL;
}
return code;
}
void tableMetaReaderIterClose(SBtableMetaReaderIter *p) {
if (p == NULL) return;
blockWrapperCleanup(&p->pBlockWrapper);
taosMemoryFree(p);
}