mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
210 lines
7.5 KiB
C
210 lines
7.5 KiB
C
/*
|
|
* perm_entropy — TDengine C UDF: aggregate permutation entropy
|
|
*
|
|
* Permutation entropy (Bandt & Pompe, PRL 2002) measures the complexity
|
|
* of a time series by computing the Shannon entropy of the distribution of
|
|
* ordinal patterns found in overlapping embedding windows of dimension
|
|
* EMBED_DIM. The result is normalised to [0, 1].
|
|
*
|
|
* This file is the CI build copy used by the function-test suite.
|
|
* The canonical documented version lives at docs/examples/udf/perm_entropy.c.
|
|
*/
|
|
|
|
#include <math.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include "taosudf.h"
|
|
|
|
#define EMBED_DIM 5
|
|
#define DELAY 1
|
|
#define MAX_EMBED_DIM 8
|
|
|
|
typedef struct {
|
|
int embed_dim;
|
|
int delay;
|
|
int64_t values_count;
|
|
int64_t values_capacity;
|
|
double *values;
|
|
} PermEntropyState;
|
|
|
|
static int32_t ensure_capacity(PermEntropyState *state, int64_t required) {
|
|
if (required <= state->values_capacity) return TSDB_CODE_SUCCESS;
|
|
|
|
int64_t new_cap = state->values_capacity > 0 ? state->values_capacity : 1024;
|
|
while (new_cap < required) {
|
|
if (new_cap > INT64_MAX / 2) { new_cap = required; break; }
|
|
new_cap *= 2;
|
|
}
|
|
if (new_cap > (int64_t)(SIZE_MAX / sizeof(double))) return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
double *p = (double *)realloc(state->values, (size_t)new_cap * sizeof(double));
|
|
if (p == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
|
state->values = p;
|
|
state->values_capacity = new_cap;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static double compute_perm_entropy(const double *data, int n, int embed_dim, int delay) {
|
|
if (data == NULL || n < embed_dim || embed_dim <= 1 || embed_dim > MAX_EMBED_DIM) return 0.0;
|
|
|
|
int n_windows = n - (embed_dim - 1) * delay;
|
|
if (n_windows <= 0) return 0.0;
|
|
int n_patterns = 1;
|
|
for (int i = 2; i <= embed_dim; i++) n_patterns *= i;
|
|
|
|
int *counts = (int *)calloc(n_patterns, sizeof(int));
|
|
if (counts == NULL) return NAN;
|
|
for (int w = 0; w < n_windows; w++) {
|
|
double v[MAX_EMBED_DIM];
|
|
int idx[MAX_EMBED_DIM];
|
|
int rank[MAX_EMBED_DIM];
|
|
for (int j = 0; j < embed_dim; j++) {
|
|
v[j] = data[w + j * delay];
|
|
idx[j] = j;
|
|
}
|
|
for (int j = 0; j < embed_dim - 1; j++)
|
|
for (int k = j + 1; k < embed_dim; k++)
|
|
if (v[idx[j]] > v[idx[k]] || (v[idx[j]] == v[idx[k]] && idx[j] > idx[k])) {
|
|
int t = idx[j];
|
|
idx[j] = idx[k];
|
|
idx[k] = t;
|
|
}
|
|
for (int j = 0; j < embed_dim; j++) rank[idx[j]] = j;
|
|
int pat = 0;
|
|
for (int j = 0; j < embed_dim; j++) {
|
|
int c = 0;
|
|
for (int k = j + 1; k < embed_dim; k++)
|
|
if (rank[k] < rank[j]) c++;
|
|
pat = pat * (embed_dim - j) + c;
|
|
}
|
|
counts[pat]++;
|
|
}
|
|
|
|
double entropy = 0.0;
|
|
for (int i = 0; i < n_patterns; i++) {
|
|
if (counts[i] > 0) {
|
|
double p = (double)counts[i] / n_windows;
|
|
entropy -= p * log2(p);
|
|
}
|
|
}
|
|
free(counts);
|
|
|
|
double max_entropy = log2((double)n_patterns);
|
|
return max_entropy > 0 ? entropy / max_entropy : 0.0;
|
|
}
|
|
|
|
DLL_EXPORT int32_t perm_entropy_init() { return TSDB_CODE_SUCCESS; }
|
|
DLL_EXPORT int32_t perm_entropy_destroy() { return TSDB_CODE_SUCCESS; }
|
|
|
|
DLL_EXPORT int32_t perm_entropy_start(SUdfInterBuf *interBuf) {
|
|
if (interBuf->bufLen < (int32_t)sizeof(PermEntropyState))
|
|
return TSDB_CODE_UDF_INVALID_BUFSIZE;
|
|
PermEntropyState *state = (PermEntropyState *)interBuf->buf;
|
|
memset(state, 0, sizeof(PermEntropyState));
|
|
state->embed_dim = EMBED_DIM;
|
|
state->delay = DELAY;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
DLL_EXPORT int32_t perm_entropy(SUdfDataBlock *block, SUdfInterBuf *interBuf,
|
|
SUdfInterBuf *newInterBuf) {
|
|
if (block->numOfCols != 1) return TSDB_CODE_UDF_INVALID_INPUT;
|
|
SUdfColumn *col = block->udfCols[0];
|
|
|
|
switch (col->colMeta.type) {
|
|
case TSDB_DATA_TYPE_TINYINT:
|
|
case TSDB_DATA_TYPE_SMALLINT:
|
|
case TSDB_DATA_TYPE_INT:
|
|
case TSDB_DATA_TYPE_BIGINT:
|
|
case TSDB_DATA_TYPE_FLOAT:
|
|
case TSDB_DATA_TYPE_DOUBLE:
|
|
break;
|
|
default:
|
|
return TSDB_CODE_UDF_INVALID_INPUT;
|
|
}
|
|
|
|
int64_t valid = 0;
|
|
for (int32_t i = 0; i < block->numOfRows; i++)
|
|
if (!udfColDataIsNull(col, i)) valid++;
|
|
|
|
PermEntropyState newState = *(PermEntropyState *)interBuf->buf;
|
|
|
|
if (valid > 0) {
|
|
int32_t code = ensure_capacity(&newState, newState.values_count + valid);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
|
|
free(orig->values);
|
|
orig->values = NULL;
|
|
return code;
|
|
}
|
|
for (int32_t i = 0; i < block->numOfRows; i++) {
|
|
if (udfColDataIsNull(col, i)) continue;
|
|
char *raw = udfColDataGetData(col, i);
|
|
double v = 0.0;
|
|
switch (col->colMeta.type) {
|
|
case TSDB_DATA_TYPE_TINYINT: v = (double)(*(int8_t *)raw); break;
|
|
case TSDB_DATA_TYPE_SMALLINT: v = (double)(*(int16_t *)raw); break;
|
|
case TSDB_DATA_TYPE_INT: v = (double)(*(int32_t *)raw); break;
|
|
case TSDB_DATA_TYPE_BIGINT: v = (double)(*(int64_t *)raw); break;
|
|
case TSDB_DATA_TYPE_FLOAT: v = (double)(*(float *)raw); break;
|
|
case TSDB_DATA_TYPE_DOUBLE: v = *(double *)raw; break;
|
|
default: continue;
|
|
}
|
|
newState.values[newState.values_count++] = v;
|
|
}
|
|
}
|
|
|
|
if (newInterBuf->buf == NULL ||
|
|
newInterBuf->bufLen < (int32_t)sizeof(PermEntropyState)) {
|
|
/* Free the heap array unconditionally and clear orig->values:
|
|
* - realloc moved array: newState.values is the new block;
|
|
* orig->values is already dangling (freed internally by realloc).
|
|
* - realloc in-place or no realloc: newState.values == orig->values;
|
|
* freeing once via newState.values is correct.
|
|
* Clear orig->values so freeUdfInterBuf() cannot double-free. */
|
|
PermEntropyState *orig = (PermEntropyState *)interBuf->buf;
|
|
free(newState.values);
|
|
newState.values = NULL;
|
|
if (orig != NULL) orig->values = NULL;
|
|
return TSDB_CODE_UDF_INVALID_BUFSIZE;
|
|
}
|
|
memcpy(newInterBuf->buf, &newState, sizeof(PermEntropyState));
|
|
newInterBuf->bufLen = sizeof(PermEntropyState);
|
|
newInterBuf->numOfResult = 0;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
DLL_EXPORT int32_t perm_entropy_finish(SUdfInterBuf *interBuf,
|
|
SUdfInterBuf *resultData) {
|
|
if (interBuf->buf == NULL) { resultData->numOfResult = 0; return TSDB_CODE_SUCCESS; }
|
|
|
|
PermEntropyState *state = (PermEntropyState *)interBuf->buf;
|
|
|
|
if (state->values_count < state->embed_dim || state->values == NULL) {
|
|
if (state->values != NULL) { free(state->values); state->values = NULL; }
|
|
resultData->numOfResult = 0;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
double entropy = compute_perm_entropy(state->values, (int)state->values_count,
|
|
state->embed_dim, state->delay);
|
|
|
|
if (isnan(entropy)) {
|
|
free(state->values); state->values = NULL;
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
if (resultData->buf == NULL || resultData->bufLen < (int32_t)sizeof(double)) {
|
|
free(state->values); state->values = NULL;
|
|
return TSDB_CODE_UDF_INVALID_BUFSIZE;
|
|
}
|
|
*(double *)resultData->buf = entropy;
|
|
resultData->bufLen = sizeof(double);
|
|
resultData->numOfResult = 1;
|
|
|
|
free(state->values);
|
|
state->values = NULL;
|
|
state->values_count = 0;
|
|
state->values_capacity = 0;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|