feat: Add materialized view support (Beta) (#1507)

Closes HDX-3082

# Summary

This PR back-ports support for materialized views from the EE repo. Note that this feature is in **Beta**, and is subject to significant changes.

This feature is intended to support:

1. Configuring AggregatingMergeTree (or SummingMergeTree) Materialized Views which are associated with a Source
2. Automatically selecting and querying an associated materialized view when a query supports it, in Chart Explorer, Custom Dashboards, the Services Dashboard, and the Search Page Histogram.
3. A UX for understanding what materialized views are available for a source, and whether (and why) it is or is not being used for a particular visualization.

## Note to Reviewer(s)

This is a large PR, but the code has largely already been reviewed.

- For net-new files, types, components, and utility functions, the code does not differ from the EE repo
- Changes to the various services dashboard pages do not differ from the EE repo
- Changes to `useOffsetPaginatedQuery`, `useChartConfig`, and `DBEditTimeChart` differ slightly due to unrelated (to MVs) drift between this repo and the EE repo, and due to the lack of feature toggles in this repo. **This is where slightly closer review would be most valuable.**

## Demo

<details>
<summary>Demo: MV Configuration</summary>

https://github.com/user-attachments/assets/fedf3bcf-892c-4b8d-a788-7e231e23bcc3
</details>

<details>
<summary>Demo: Chart Explorer</summary>

https://github.com/user-attachments/assets/fc8d1efa-7edc-42fc-98f0-75431cc056b8
</details>

<details>
<summary>Demo: Dashboards</summary>

https://github.com/user-attachments/assets/f3cb247e-711f-4d90-95b8-cf977e94f065
</details>

## Known Limitations

This feature is in Beta due to the following known limitations, which will be addressed in subsequent PRs:

1. Visualization start and end time, when not aligned with the granularity of MVs, will result in statistics based on the MV "time buckets" which fall inside the date range. This may not align exactly with the source table data which is in the selected date range.
2. Alerts do not make use of MVs, even if the associated visualization does. Due to (1), this means that alert values may not exactly match the values shown in the associated visualization.

## Differences in OSS vs EE Support

 - In OSS, there is a beta label on the MV configurations section
 - In EE there are feature toggles to enable MV support, in OSS the feature is enabled for all teams, but will only run for sources with MVs configured.

## Testing

To test, a couple of MVs can be created on the default `otel_traces` table, directly in ClickHouse:

<details>

<summary>Example MVs DDL</summary>

```sql
CREATE TABLE default.metrics_rollup_1m
(
    `Timestamp` DateTime,
    `ServiceName` LowCardinality(String),
    `SpanKind` LowCardinality(String),
    `StatusCode` LowCardinality(String),
    `count` SimpleAggregateFunction(sum, UInt64),
    `sum__Duration` SimpleAggregateFunction(sum, UInt64),
    `avg__Duration` AggregateFunction(avg, UInt64),
    `quantile__Duration` AggregateFunction(quantileTDigest(0.5), UInt64),
    `min__Duration` SimpleAggregateFunction(min, UInt64),
    `max__Duration` SimpleAggregateFunction(max, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toDate(Timestamp)
ORDER BY (Timestamp, StatusCode, SpanKind, ServiceName);

CREATE MATERIALIZED VIEW default.metrics_rollup_1m_mv TO default.metrics_rollup_1m
(
    `Timestamp` DateTime,
    `ServiceName` LowCardinality(String),
    `SpanKind` LowCardinality(String),
    `version` LowCardinality(String),
    `StatusCode` LowCardinality(String),
    `count` UInt64,
    `sum__Duration` Int64,
    `avg__Duration` AggregateFunction(avg, UInt64),
    `quantile__Duration` AggregateFunction(quantileTDigest(0.5), UInt64),
    `min__Duration` SimpleAggregateFunction(min, UInt64),
    `max__Duration` SimpleAggregateFunction(max, UInt64)
)
AS SELECT
    toStartOfMinute(Timestamp) AS Timestamp,
    ServiceName,
    SpanKind,
    StatusCode,
    count() AS count,
    sum(Duration) AS sum__Duration,
    avgState(Duration) AS avg__Duration,
    quantileTDigestState(0.5)(Duration) AS quantile__Duration,
    minSimpleState(Duration) AS min__Duration,
    maxSimpleState(Duration) AS max__Duration
FROM default.otel_traces
GROUP BY
    Timestamp,
    ServiceName,
    SpanKind,
    StatusCode;
```

```sql
CREATE TABLE default.span_kind_rollup_1m
(
    `Timestamp` DateTime,
    `ServiceName` LowCardinality(String),
    `SpanKind` LowCardinality(String),
    `histogram__Duration` AggregateFunction(histogram(20), UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toDate(Timestamp)
ORDER BY (Timestamp, ServiceName, SpanKind);

CREATE MATERIALIZED VIEW default.span_kind_rollup_1m_mv TO default.span_kind_rollup_1m
(
    `Timestamp` DateTime,
    `ServiceName` LowCardinality(String),
    `SpanKind` LowCardinality(String),
    `histogram__Duration` AggregateFunction(histogram(20), UInt64)
)
AS SELECT
    toStartOfMinute(Timestamp) AS Timestamp,
    ServiceName,
    SpanKind,
    histogramState(20)(Duration) AS histogram__Duration
FROM default.otel_traces
GROUP BY
    Timestamp,
    ServiceName,
    SpanKind;
```
</details>

Then you'll need to configure the materialized views in your source settings:

<details>
<summary>Source Configuration (should auto-infer when MVs are selected)</summary>

<img width="949" height="1011" alt="Screenshot 2025-12-19 at 10 26 54 AM" src="https://github.com/user-attachments/assets/fc46a1b9-de8b-4b95-a8ef-ba5fee905685" />

</details>
This commit is contained in:
Drew Davis 2025-12-19 11:17:23 -05:00 committed by GitHub
parent 99e7ce257f
commit a5a04aa92c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 3777 additions and 117 deletions

View file

@ -0,0 +1,7 @@
---
"@hyperdx/common-utils": minor
"@hyperdx/api": minor
"@hyperdx/app": minor
---
feat: Add materialized view support (Beta)

View file

@ -72,6 +72,9 @@ export const Source = mongoose.model<ISource>(
highlightedRowAttributeExpressions: {
type: mongoose.Schema.Types.Array,
},
materializedViews: {
type: mongoose.Schema.Types.Array,
},
metricTables: {
type: {

View file

@ -75,6 +75,7 @@ import {
} from '@/dashboard';
import DBSqlRowTableWithSideBar from './components/DBSqlRowTableWithSidebar';
import MVOptimizationIndicator from './components/MaterializedViews/MVOptimizationIndicator';
import OnboardingModal from './components/OnboardingModal';
import { Tags } from './components/Tags';
import useDashboardFilters from './hooks/useDashboardFilters';
@ -256,66 +257,77 @@ const Tile = forwardRef(
<Text size="sm" ms="xs">
{chart.config.name}
</Text>
{hovered ? (
<Flex gap="0px">
{(chart.config.displayType === DisplayType.Line ||
chart.config.displayType === DisplayType.StackedBar) && (
<Indicator
size={alert?.state === AlertState.OK ? 6 : 8}
zIndex={1}
color={alertIndicatorColor}
processing={alert?.state === AlertState.ALERT}
label={!alert && <span className="fs-8">+</span>}
mr={4}
>
<Tooltip label={alertTooltip} withArrow>
<Button
data-testid={`tile-alerts-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onEditClick}
>
<IconBell size={16} />
</Button>
</Tooltip>
</Indicator>
)}
<Group>
{hovered ? (
<Flex gap="0px" onMouseDown={e => e.stopPropagation()}>
{(chart.config.displayType === DisplayType.Line ||
chart.config.displayType === DisplayType.StackedBar) && (
<Indicator
size={alert?.state === AlertState.OK ? 6 : 8}
zIndex={1}
color={alertIndicatorColor}
processing={alert?.state === AlertState.ALERT}
label={!alert && <span className="fs-8">+</span>}
mr={4}
>
<Tooltip label={alertTooltip} withArrow>
<Button
data-testid={`tile-alerts-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onEditClick}
>
<IconBell size={16} />
</Button>
</Tooltip>
</Indicator>
)}
<Button
data-testid={`tile-duplicate-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onDuplicateClick}
title="Duplicate"
>
<IconCopy size={14} />
</Button>
<Button
data-testid={`tile-edit-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onEditClick}
title="Edit"
>
<IconPencil size={14} />
</Button>
<Button
data-testid={`tile-delete-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onDeleteClick}
title="Delete"
>
<IconTrash size={14} />
</Button>
</Flex>
) : (
<Box h={22} />
)}
<Button
data-testid={`tile-duplicate-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onDuplicateClick}
title="Duplicate"
>
<IconCopy size={14} />
</Button>
<Button
data-testid={`tile-edit-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onEditClick}
title="Edit"
>
<IconPencil size={14} />
</Button>
<Button
data-testid={`tile-delete-button-${chart.id}`}
variant="subtle"
color="gray"
size="xxs"
onClick={onDeleteClick}
title="Delete"
>
<IconTrash size={14} />
</Button>
</Flex>
) : (
<Box h={22} />
)}
{source?.materializedViews?.length && queriedConfig && (
<Box onMouseDown={e => e.stopPropagation()}>
<MVOptimizationIndicator
config={queriedConfig}
source={source}
variant="icon"
/>
</Box>
)}
</Group>
</div>
<div
className="fs-7 text-muted flex-grow-1 overflow-hidden"

View file

@ -518,6 +518,7 @@ function useSearchedConfigToChartConfig({
data: {
select: select || (sourceObj.defaultTableSelectExpression ?? ''),
from: sourceObj.from,
source: sourceObj.id,
...(sourceObj.tableFilterExpression != null
? {
filters: [

View file

@ -138,6 +138,7 @@ function ServiceSelectControlled({
const { expressions } = useServiceDashboardExpressions({ source });
const queriedConfig = {
source: source?.id,
timestampValueExpression: source?.timestampValueExpression || '',
from: {
databaseName: source?.from.databaseName || '',
@ -250,6 +251,7 @@ export function EndpointLatencyChart({
'avg_duration_ns',
]}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -303,6 +305,7 @@ export function EndpointLatencyChart({
) : (
<DBHistogramChart
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -311,9 +314,15 @@ export function EndpointLatencyChart({
where: appliedConfig.where || '',
whereLanguage: appliedConfig.whereLanguage || 'sql',
select: [
{
alias: 'data_nanoseconds',
aggFn: 'histogram',
level: 20,
valueExpression: expressions.duration,
},
{
alias: 'data',
valueExpression: `histogram(20)(${expressions.durationInMillis})`,
valueExpression: `arrayMap(bin -> (bin.1 / ${expressions.durationDivisorForMillis}, bin.2 / ${expressions.durationDivisorForMillis}, bin.3), data_nanoseconds)`,
},
],
filters: [
@ -361,6 +370,7 @@ function HttpTab({
if (!source || !expressions) return null;
if (reqChartType === 'overall') {
return {
source: source.id,
...pick(source, ['timestampValueExpression', 'connection', 'from']),
where: appliedConfig.where || '',
whereLanguage: appliedConfig.whereLanguage || 'sql',
@ -392,6 +402,7 @@ function HttpTab({
return {
timestampValueExpression: 'series_time_bucket',
connection: source.connection,
source: source.id,
with: [
{
name: 'error_series',
@ -557,6 +568,7 @@ function HttpTab({
<DBTimeChart
sourceId={source.id}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -601,6 +613,7 @@ function HttpTab({
'error_requests',
]}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -726,6 +739,7 @@ function HttpTab({
'error_count',
]}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -951,6 +965,7 @@ function DatabaseTab({
dateRange: searchedTimeRange,
timestampValueExpression: 'series_time_bucket',
connection: source.connection,
source: source.id,
} satisfies ChartConfigWithDateRange;
}, [appliedConfig, expressions, searchedTimeRange, source]);
@ -1071,6 +1086,7 @@ function DatabaseTab({
dateRange: searchedTimeRange,
timestampValueExpression: 'series_time_bucket',
connection: source.connection,
source: source.id,
} satisfies ChartConfigWithDateRange;
}, [appliedConfig, expressions, searchedTimeRange, source]);
@ -1149,6 +1165,7 @@ function DatabaseTab({
'p50_duration_ns',
]}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -1229,6 +1246,7 @@ function DatabaseTab({
'p50_duration_ns',
]}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -1327,6 +1345,7 @@ function ErrorsTab({
<DBTimeChart
sourceId={source.id}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',

View file

@ -94,6 +94,7 @@ import {
import HDXMarkdownChart from '../HDXMarkdownChart';
import type { NumberFormat } from '../types';
import MVOptimizationIndicator from './MaterializedViews/MVOptimizationIndicator';
import { AggFnSelectControlled } from './AggFnSelect';
import DBNumberChart from './DBNumberChart';
import DBSqlRowTableWithSideBar from './DBSqlRowTableWithSidebar';
@ -537,6 +538,17 @@ export default function EditTimeChartForm({
const [queriedConfig, setQueriedConfig] = useState<
ChartConfigWithDateRange | undefined
>(undefined);
const [queriedSource, setQueriedSource] = useState<TSource | undefined>(
undefined,
);
const setQueriedConfigAndSource = useCallback(
(config: ChartConfigWithDateRange, source: TSource) => {
setQueriedConfig(config);
setQueriedSource(source);
},
[],
);
const [saveToDashboardModalOpen, setSaveToDashboardModalOpen] =
useState(false);
@ -570,16 +582,23 @@ export default function EditTimeChartForm({
? config.orderBy
: undefined,
};
setQueriedConfig(
setQueriedConfigAndSource(
// WARNING: DON'T JUST ASSIGN OBJECTS OR DO SPREAD OPERATOR STUFF WHEN
// YOUR STATE IS AN OBJECT. YOU'RE COPYING BY REFERENCE WHICH MIGHT
// ACCIDENTALLY CAUSE A useQuery SOMEWHERE TO FIRE A REQUEST EVERYTIME
// AN INPUT CHANGES. USE structuredClone TO PERFORM A DEEP COPY INSTEAD
structuredClone(newConfig),
tableSource,
);
}
})();
}, [handleSubmit, setChartConfig, setQueriedConfig, tableSource, dateRange]);
}, [
handleSubmit,
setChartConfig,
setQueriedConfigAndSource,
tableSource,
dateRange,
]);
const onTableSortingChange = useCallback(
(sortState: SortingState | null) => {
@ -674,6 +693,29 @@ export default function EditTimeChartForm({
const queryReady = isQueryReady(queriedConfig);
// The chart config to use when explaining to to the user whether and why
// their query is or is not being executed against a materialized view.
const chartConfigForMvOptimizationExplanation:
| ChartConfigWithDateRange
| undefined = useMemo(() => {
// If the user has submitted a query, us the submitted query, unless they have changed sources
if (queriedConfig && queriedSource?.id === tableSource?.id) {
return queriedConfig;
}
// If there is a chart config from the props (either a saved config or one from the URL params), use that,
// unless a different source has been selected.
return chartConfig && tableSource?.id === chartConfig.source
? {
...chartConfig,
dateRange,
timestampValueExpression: tableSource.timestampValueExpression,
from: tableSource.from,
connection: tableSource.connection,
}
: undefined;
}, [chartConfig, dateRange, tableSource, queriedConfig, queriedSource]);
const previousDateRange = getPreviousDateRange(dateRange);
const sampleEventsConfig = useMemo(
@ -786,19 +828,27 @@ export default function EditTimeChartForm({
</div>
) : (
<>
<Flex mb="md" align="center" gap="sm">
<Text pe="md" size="sm">
Data Source
</Text>
<SourceSelectControlled
size="xs"
control={control}
name="source"
data-testid="source-selector"
sourceSchemaPreview={
<SourceSchemaPreview source={tableSource} variant="text" />
}
/>
<Flex mb="md" align="center" gap="sm" justify="space-between">
<Group>
<Text pe="md" size="sm">
Data Source
</Text>
<SourceSelectControlled
size="xs"
control={control}
name="source"
data-testid="source-selector"
sourceSchemaPreview={
<SourceSchemaPreview source={tableSource} variant="text" />
}
/>
</Group>
{tableSource && activeTab !== 'search' && (
<MVOptimizationIndicator
source={tableSource}
config={chartConfigForMvOptimizationExplanation}
/>
)}
</Flex>
{displayType !== DisplayType.Search && Array.isArray(select) ? (

View file

@ -0,0 +1,79 @@
import { useMemo } from 'react';
import { splitAndTrimWithBracket } from '@hyperdx/common-utils/dist/core/utils';
import { MaterializedViewConfiguration } from '@hyperdx/common-utils/dist/types';
import { Group, Pill, Stack, Table, Text } from '@mantine/core';
export default function MVConfigSummary({
config,
}: {
config: MaterializedViewConfiguration;
}) {
const dimensionColumnsSplit = useMemo(
() => splitAndTrimWithBracket(config.dimensionColumns),
[config.dimensionColumns],
);
const columnsAndAggFns = useMemo(() => {
const map = new Map<string, string[]>();
for (const { sourceColumn = '', aggFn } of config.aggregatedColumns) {
if (map.has(sourceColumn)) {
map.get(sourceColumn)?.push(aggFn);
} else {
map.set(sourceColumn, [aggFn]);
}
}
return Array.from(map.entries()).sort(([sourceColA], [sourceColB]) =>
sourceColA.localeCompare(sourceColB),
);
}, [config.aggregatedColumns]);
return (
<Stack gap="md">
<div>
<Text size="sm" fw={500} mb="xs">
Minimum Granularity
</Text>
<Pill>{config.minGranularity}</Pill>
</div>
<div>
<Text size="sm" fw={500} mb="xs">
Available Group and Filter Columns
</Text>
<Group gap="xs">
{dimensionColumnsSplit.map(col => (
<Pill key={col}>{col}</Pill>
))}
</Group>
</div>
<div>
<Text size="sm" fw={500} mb="sm">
Available Aggregated Columns
</Text>
<Table>
<Table.Thead>
<Table.Tr>
<Table.Th>Column</Table.Th>
<Table.Th>Aggregation</Table.Th>
</Table.Tr>
</Table.Thead>
<Table.Tbody>
{columnsAndAggFns.map(([sourceColumn, aggFns]) => (
<Table.Tr key={sourceColumn}>
<Table.Td>{sourceColumn}</Table.Td>
<Table.Td>
<Group gap="xs">
{aggFns.map(aggFn => (
<Pill key={aggFn}>{aggFn}</Pill>
))}
</Group>
</Table.Td>
</Table.Tr>
))}
</Table.Tbody>
</Table>
</div>
</Stack>
);
}

View file

@ -0,0 +1,99 @@
import { useState } from 'react';
import {
ChartConfigWithOptDateRange,
TSource,
} from '@hyperdx/common-utils/dist/types';
import { ActionIcon, Badge, Tooltip } from '@mantine/core';
import { IconBolt, IconBoltOff } from '@tabler/icons-react';
import { useMVOptimizationExplanation } from '@/hooks/useMVOptimizationExplanation';
import MVOptimizationModal from './MVOptimizationModal';
const WARNING_COLOR = 'var(--color-bg-warning)';
const SUCCESS_COLOR = 'var(--color-bg-success)';
function MVOptimizationIcon({
isInWarningState,
onClick,
}: {
isInWarningState: boolean;
onClick: () => void;
}) {
return isInWarningState ? (
<Tooltip label="Not Accelerated">
<ActionIcon onClick={onClick}>
<IconBoltOff size={16} color={WARNING_COLOR} />
</ActionIcon>
</Tooltip>
) : (
<Tooltip label="Accelerated">
<ActionIcon onClick={onClick}>
<IconBolt size={18} color={SUCCESS_COLOR} />
</ActionIcon>
</Tooltip>
);
}
function MVOptimizationBadge({
isInWarningState,
onClick,
}: {
isInWarningState: boolean;
onClick: () => void;
}) {
return (
<Badge
color={isInWarningState ? WARNING_COLOR : SUCCESS_COLOR}
onClick={onClick}
className="cursor-pointer"
>
{isInWarningState ? 'Not Accelerated' : 'Accelerated'}
</Badge>
);
}
export default function MVOptimizationIndicator({
source,
config,
variant = 'badge',
}: {
source: TSource;
config: ChartConfigWithOptDateRange | undefined;
variant?: 'badge' | 'icon';
}) {
const [modalOpen, setModalOpen] = useState(false);
const { data } = useMVOptimizationExplanation(config);
const mvConfigs = source.materializedViews ?? [];
if (!mvConfigs?.length) {
return null;
}
const isInWarningState = !!config && !!data && !data?.optimizedConfig;
return (
<>
{variant === 'icon' ? (
<MVOptimizationIcon
isInWarningState={isInWarningState}
onClick={() => setModalOpen(true)}
/>
) : (
<MVOptimizationBadge
isInWarningState={isInWarningState}
onClick={() => setModalOpen(true)}
/>
)}
{data && (
<MVOptimizationModal
mvConfigs={mvConfigs}
explanations={data.explanations}
opened={modalOpen}
onClose={() => setModalOpen(false)}
/>
)}
</>
);
}

View file

@ -0,0 +1,122 @@
import { useMemo } from 'react';
import { MVOptimizationExplanation } from '@hyperdx/common-utils/dist/core/materializedViews';
import { MaterializedViewConfiguration } from '@hyperdx/common-utils/dist/types';
import {
Accordion,
Alert,
Badge,
Group,
Modal,
Text,
Tooltip,
} from '@mantine/core';
import MVConfigSummary from './MVConfigSummary';
const WARNING_COLOR = 'var(--color-bg-warning)';
const SUCCESS_COLOR = 'var(--color-bg-success)';
function mvConfigToKey(config: MaterializedViewConfiguration) {
return `mv-${config.databaseName}-${config.tableName}`;
}
export default function MVOptimizationModal({
mvConfigs,
explanations,
opened,
onClose,
}: {
mvConfigs: MaterializedViewConfiguration[];
explanations: MVOptimizationExplanation[];
opened: boolean;
onClose: () => void;
}) {
const hasMultipleMVs = mvConfigs.length > 1;
const explanationsByKey = useMemo(
() => new Map(explanations.map(e => [mvConfigToKey(e.mvConfig), e])),
[explanations],
);
const firstUsedMv = explanations.find(e => e.success)?.mvConfig;
return (
<Modal
title={
<Group>
{hasMultipleMVs ? 'Materialized Views' : 'Materialized View'}
<Badge size="sm" radius="sm" color="gray">
Beta
</Badge>
</Group>
}
opened={opened}
onClose={onClose}
size="lg"
>
<Text size="sm" mb="sm">
This source is configured with{' '}
{hasMultipleMVs ? 'materialized views' : 'a materialized view'} for
accelerating some aggregations.
</Text>
<Accordion defaultValue={firstUsedMv && mvConfigToKey(firstUsedMv)}>
{mvConfigs.map(config => {
const key = mvConfigToKey(config);
const explanation = explanationsByKey.get(key);
const hasErrors = !!explanation?.errors.length;
const isBeingUsedByOptimizedConfig = explanation?.success;
const rowEstimate =
explanation?.rowEstimate?.toLocaleString() ?? 'N/A';
return (
<Accordion.Item value={key} key={key}>
<Accordion.Control px="xs">
<Group justify="space-between">
<Text>{config.tableName}</Text>
{isBeingUsedByOptimizedConfig ? (
<Tooltip label={`Estimated rows scanned: ${rowEstimate}`}>
<Badge me="md" color={SUCCESS_COLOR}>
Active
</Badge>
</Tooltip>
) : hasErrors ? (
<Tooltip label="This materialized view is not compatible with the selected query.">
<Badge me="md" color={WARNING_COLOR}>
Incompatible
</Badge>
</Tooltip>
) : explanation ? (
<Tooltip label={`Estimated rows scanned: ${rowEstimate}`}>
<Badge me="md" color="gray">
Skipped
</Badge>
</Tooltip>
) : null}
</Group>
</Accordion.Control>
<Accordion.Panel>
<>
<MVConfigSummary config={config} />
{hasErrors && (
<Alert color="red" mt="xs">
<Text size="sm" fw={500} mb="xs">
The query cannot be accelerated using this materialized
view for the following reason(s):
</Text>
{explanation.errors.map((error, idx) => (
<Text size="sm" key={idx} mt="xs">
{error}
</Text>
))}
</Alert>
)}
</>
</Accordion.Panel>
</Accordion.Item>
);
})}
</Accordion>
</Modal>
);
}

View file

@ -98,6 +98,7 @@ export default function ServiceDashboardDbQuerySidePanel({
sourceId={sourceId}
hiddenSeries={['total_duration_ns']}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -135,6 +136,7 @@ export default function ServiceDashboardDbQuerySidePanel({
<DBTimeChart
sourceId={sourceId}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',

View file

@ -96,6 +96,7 @@ export default function ServiceDashboardEndpointPerformanceChart({
groupColumn="group"
valueColumn="Total Time Spent"
config={{
source: source.id,
...pick(source, ['timestampValueExpression', 'connection', 'from']),
where: '',
whereLanguage: 'sql',

View file

@ -47,7 +47,7 @@ export default function ServiceDashboardEndpointSidePanel({
const filters: Filter[] = [
{
type: 'sql',
condition: `${expressions.spanName} IN ('${endpoint}') AND ${expressions.isSpanKindServer}`,
condition: `${expressions.endpoint} IN ('${endpoint}') AND ${expressions.isSpanKindServer}`,
},
];
if (service) {
@ -105,6 +105,7 @@ export default function ServiceDashboardEndpointSidePanel({
sourceId={source.id}
hiddenSeries={['total_count', 'error_count']}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',
@ -149,6 +150,7 @@ export default function ServiceDashboardEndpointSidePanel({
<DBTimeChart
sourceId={source.id}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',

View file

@ -31,6 +31,7 @@ export default function SlowestEventsTile({
const { data, isLoading, isError, error } = useQueriedChartConfig(
{
source: source.id,
...pick(source, ['timestampValueExpression', 'connection', 'from']),
where: '',
whereLanguage: 'sql',
@ -110,6 +111,7 @@ export default function SlowestEventsTile({
breadcrumbPath={[{ label: 'Endpoint' }]}
sourceId={source.id}
config={{
source: source.id,
...pick(source, [
'timestampValueExpression',
'connection',

View file

@ -6,6 +6,7 @@ import {
useForm,
UseFormSetValue,
UseFormWatch,
useWatch,
} from 'react-hook-form';
import { z } from 'zod';
import {
@ -18,6 +19,7 @@ import {
import {
ActionIcon,
Anchor,
Badge,
Box,
Button,
Center,
@ -26,6 +28,7 @@ import {
Grid,
Group,
Radio,
Select,
Slider,
Stack,
Text,
@ -42,7 +45,6 @@ import {
import { SourceSelectControlled } from '@/components/SourceSelect';
import { IS_METRICS_ENABLED, IS_SESSIONS_ENABLED } from '@/config';
import { useConnections } from '@/connection';
import SearchInputV2 from '@/SearchInputV2';
import {
inferTableSourceConfig,
isValidMetricTable,
@ -53,16 +55,35 @@ import {
useSources,
useUpdateSource,
} from '@/source';
import {
inferMaterializedViewConfig,
MV_AGGREGATE_FUNCTIONS,
} from '@/utils/materializedViews';
import ConfirmDeleteMenu from './ConfirmDeleteMenu';
import { ConnectionSelectControlled } from './ConnectionSelect';
import { DatabaseSelectControlled } from './DatabaseSelect';
import { DBTableSelectControlled } from './DBTableSelect';
import { InputControlled } from './InputControlled';
import SelectControlled from './SelectControlled';
import { SQLInlineEditorControlled } from './SQLInlineEditor';
const DEFAULT_DATABASE = 'default';
const MV_GRANULARITY_OPTIONS = [
{ value: '1 second', label: '1 second' },
{ value: '1 minute', label: '1 minute' },
{ value: '5 minute', label: '5 minutes' },
{ value: '15 minute', label: '15 minutes' },
{ value: '1 hour', label: '1 hour' },
{ value: '1 day', label: '1 day' },
];
const MV_AGGREGATE_FUNCTION_OPTIONS = MV_AGGREGATE_FUNCTIONS.map(fn => ({
value: fn,
label: fn,
}));
// TODO: maybe otel clickhouse export migrate the schema?
const OTEL_CLICKHOUSE_EXPRESSIONS = {
timestampValueExpression: 'TimeUnix',
@ -264,6 +285,388 @@ function HighlightedAttributeExpressionsFormRow({
);
}
/** Component for configuring one or more materialized views */
function MaterializedViewsFormSection({
control,
watch,
setValue,
}: TableModelProps) {
const databaseName =
useWatch({ control, name: `from.databaseName` }) || DEFAULT_DATABASE;
const {
fields: materializedViews,
append: appendMaterializedView,
remove: removeMaterializedView,
} = useFieldArray({
control,
name: 'materializedViews',
});
return (
<Stack gap="md">
<FormRow
label={
<Group>
Materialized Views
<Badge size="sm" radius="sm" color="gray">
Beta
</Badge>
</Group>
}
helpText="Configure materialized views for query optimization. These pre-aggregated views can significantly improve query performance on aggregation queries."
>
<Stack gap="md">
{materializedViews.map((field, index) => (
<MaterializedViewFormSection
key={field.id}
watch={watch}
control={control}
mvIndex={index}
setValue={setValue}
onRemove={() => removeMaterializedView(index)}
/>
))}
<Button
variant="default"
onClick={() => {
appendMaterializedView({
databaseName: databaseName,
tableName: '',
dimensionColumns: '',
minGranularity: '',
timestampColumn: '',
aggregatedColumns: [],
});
}}
>
<Group>
<IconCirclePlus size={16} />
Add Materialized View
</Group>
</Button>
</Stack>
</FormRow>
</Stack>
);
}
/** Component for configuring a single materialized view */
function MaterializedViewFormSection({
watch,
control,
mvIndex,
onRemove,
setValue,
}: { mvIndex: number; onRemove: () => void } & TableModelProps) {
const connection = useWatch({ control, name: `connection` });
const sourceDatabaseName =
useWatch({ control, name: `from.databaseName` }) || DEFAULT_DATABASE;
const mvDatabaseName =
useWatch({ control, name: `materializedViews.${mvIndex}.databaseName` }) ||
sourceDatabaseName;
const mvTableName =
useWatch({ control, name: `materializedViews.${mvIndex}.tableName` }) || '';
return (
<Stack gap="sm">
<Grid columns={2} flex={1}>
<Grid.Col span={1}>
<DatabaseSelectControlled
control={control}
name={`materializedViews.${mvIndex}.databaseName`}
connectionId={connection}
/>
</Grid.Col>
<Grid.Col span={1}>
<Group>
<Box flex={1}>
<DBTableSelectControlled
database={mvDatabaseName}
control={control}
name={`materializedViews.${mvIndex}.tableName`}
connectionId={connection}
/>
</Box>
<ActionIcon size="sm" onClick={onRemove}>
<IconTrash size={16} />
</ActionIcon>
</Group>
</Grid.Col>
<Grid.Col span={1}>
<Text size="xs" fw={500} mb={4}>
Timestamp Column
</Text>
<SQLInlineEditorControlled
tableConnection={{
databaseName: mvDatabaseName,
tableName: mvTableName,
connectionId: connection,
}}
control={control}
placeholder="Timestamp"
name={`materializedViews.${mvIndex}.timestampColumn`}
disableKeywordAutocomplete
/>
</Grid.Col>
<Grid.Col span={1}>
<Text size="xs" fw={500} mb={4}>
Granularity
<Tooltip
label={'The granularity of the timestamp column'}
color="dark"
c="white"
multiline
maw={600}
>
<IconHelpCircle size={14} className="cursor-pointer ms-1" />
</Tooltip>
</Text>
<Controller
control={control}
name={`materializedViews.${mvIndex}.minGranularity`}
render={({ field }) => (
<Select
{...field}
data={MV_GRANULARITY_OPTIONS}
placeholder="Granularity"
size="sm"
/>
)}
/>
</Grid.Col>
</Grid>
<Box>
<Text size="xs" fw={500} mb={4}>
Dimension Columns (comma-separated)
<Tooltip
label={
'Columns which are not pre-aggregated in the materialized view and can be used for filtering and grouping.'
}
color="dark"
c="white"
multiline
maw={600}
>
<IconHelpCircle size={14} className="cursor-pointer ms-1" />
</Tooltip>
</Text>
<SQLInlineEditorControlled
tableConnection={{
databaseName: mvDatabaseName,
tableName: mvTableName,
connectionId: connection,
}}
control={control}
name={`materializedViews.${mvIndex}.dimensionColumns`}
placeholder="ServiceName, StatusCode"
disableKeywordAutocomplete
/>
</Box>
<AggregatedColumnsFormSection
control={control}
mvIndex={mvIndex}
watch={watch}
setValue={setValue}
/>
<Divider />
</Stack>
);
}
/** Component for configuring the Aggregated Columns list for a single materialized view */
function AggregatedColumnsFormSection({
control,
watch,
setValue,
mvIndex,
}: TableModelProps & { mvIndex: number }) {
const {
fields: aggregates,
append: appendAggregate,
remove: removeAggregate,
replace: replaceAggregates,
} = useFieldArray({
control,
name: `materializedViews.${mvIndex}.aggregatedColumns`,
});
const addAggregate = useCallback(() => {
appendAggregate({ sourceColumn: '', aggFn: 'avg', mvColumn: '' });
}, [appendAggregate]);
useEffect(() => {
const { unsubscribe } = watch(async (value, { name, type }) => {
try {
if (
(value.kind === SourceKind.Log || value.kind === SourceKind.Trace) &&
value.connection &&
value.materializedViews?.[mvIndex] &&
value.materializedViews[mvIndex].databaseName &&
value.materializedViews[mvIndex].tableName &&
value.from?.databaseName &&
value.from?.tableName &&
name === `materializedViews.${mvIndex}.tableName` &&
type === 'change'
) {
const mvDatabaseName = value.materializedViews[mvIndex].databaseName;
const mvTableName = value.materializedViews[mvIndex].tableName;
const config = await inferMaterializedViewConfig(
{
databaseName: mvDatabaseName,
tableName: mvTableName,
connectionId: value.connection,
},
{
databaseName: value.from.databaseName,
tableName: value.from.tableName,
connectionId: value.connection,
},
);
if (config) {
setValue(`materializedViews.${mvIndex}`, config);
replaceAggregates(config.aggregatedColumns ?? []);
notifications.show({
color: 'green',
message:
'Partially inferred materialized view configuration from view schema.',
});
} else {
notifications.show({
color: 'yellow',
message: 'Unable to infer materialized view configuration.',
});
}
}
} catch (e) {
console.error(e);
}
});
return () => unsubscribe();
}, [watch, mvIndex, replaceAggregates, setValue]);
return (
<Box>
<Text size="xs" mb={4}>
Pre-aggregated Columns
<Tooltip
label={'Columns which are pre-aggregated by the materialized view'}
color="dark"
c="white"
multiline
maw={600}
>
<IconHelpCircle size={14} className="cursor-pointer ms-1" />
</Tooltip>
</Text>
<Grid columns={10}>
{aggregates.map((field, colIndex) => (
<AggregatedColumnRow
key={field.id}
watch={watch}
setValue={setValue}
control={control}
mvIndex={mvIndex}
colIndex={colIndex}
onRemove={() => removeAggregate(colIndex)}
/>
))}
</Grid>
<Button size="sm" variant="default" onClick={addAggregate} mt="lg">
<Group>
<IconCirclePlus size={16} />
Add Column
</Group>
</Button>
</Box>
);
}
/** Component to render one row in the MV Aggregated Columns section */
function AggregatedColumnRow({
control,
mvIndex,
colIndex,
onRemove,
}: TableModelProps & {
mvIndex: number;
colIndex: number;
onRemove: () => void;
}) {
const connectionId = useWatch({ control, name: `connection` });
const sourceDatabaseName =
useWatch({ control, name: `from.databaseName` }) || DEFAULT_DATABASE;
const sourceTableName = useWatch({ control, name: `from.tableName` });
const mvDatabaseName =
useWatch({ control, name: `materializedViews.${mvIndex}.databaseName` }) ||
sourceDatabaseName;
const mvTableName = useWatch({
control,
name: `materializedViews.${mvIndex}.tableName`,
});
const isCount =
useWatch({
control,
name: `materializedViews.${mvIndex}.aggregatedColumns.${colIndex}.aggFn`,
}) === 'count';
return (
<>
<Grid.Col span={2}>
<SelectControlled
control={control}
name={`materializedViews.${mvIndex}.aggregatedColumns.${colIndex}.aggFn`}
data={MV_AGGREGATE_FUNCTION_OPTIONS}
size="sm"
/>
</Grid.Col>
{!isCount && (
<Grid.Col span={4}>
<SQLInlineEditorControlled
tableConnection={{
databaseName: sourceDatabaseName,
tableName: sourceTableName,
connectionId,
}}
control={control}
name={`materializedViews.${mvIndex}.aggregatedColumns.${colIndex}.sourceColumn`}
placeholder="Source Column"
disableKeywordAutocomplete
/>
</Grid.Col>
)}
<Grid.Col span={!isCount ? 4 : 8}>
<Group wrap="nowrap">
<Box flex={1}>
<SQLInlineEditorControlled
tableConnection={{
databaseName: mvDatabaseName,
tableName: mvTableName,
connectionId,
}}
control={control}
name={`materializedViews.${mvIndex}.aggregatedColumns.${colIndex}.mvColumn`}
placeholder="View Column"
disableKeywordAutocomplete
/>
</Box>
<ActionIcon size="sm" onClick={onRemove}>
<IconTrash size={16} />
</ActionIcon>
</Group>
</Grid.Col>
</>
);
}
// traceModel= ...
// logModel=....
// traceModel.logModel = 'custom'
@ -514,6 +917,8 @@ export function LogTableModelForm(props: TableModelProps) {
label="Highlighted Trace Attributes"
helpText="Expressions defining trace-level attributes which are displayed in the trace view for the selected trace."
/>
<Divider />
<MaterializedViewsFormSection {...props} />
</Stack>
</>
);
@ -794,6 +1199,8 @@ export function TraceTableModelForm(props: TableModelProps) {
label="Highlighted Trace Attributes"
helpText="Expressions defining trace-level attributes which are displayed in the trace view for the selected trace."
/>
<Divider />
<MaterializedViewsFormSection {...props} />
</Stack>
);
}

View file

@ -93,7 +93,7 @@ const TableSchemaPreview = ({
export interface SourceSchemaPreviewProps {
source?: Pick<TSource, 'connection' | 'from' | 'metricTables'> &
Partial<Pick<TSource, 'kind' | 'name'>>;
Partial<Pick<TSource, 'kind' | 'name' | 'materializedViews'>>;
iconStyles?: Pick<TextProps, 'size' | 'color'>;
variant?: 'icon' | 'text';
}
@ -139,6 +139,16 @@ const SourceSchemaPreview = ({
});
}
const mvConfigs = source?.materializedViews ?? [];
tables.push(
...mvConfigs.map(({ tableName, databaseName }) => ({
databaseName,
tableName,
connectionId: source!.connection,
title: `${tableName} (MV)`,
})),
);
const isEnabled = !!source && tables.length > 0;
return (

View file

@ -5,6 +5,8 @@ import {
ResponseJSON,
} from '@hyperdx/common-utils/dist/clickhouse';
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/browser';
import { tryOptimizeConfigWithMaterializedView } from '@hyperdx/common-utils/dist/core/materializedViews';
import { Metadata } from '@hyperdx/common-utils/dist/core/metadata';
import {
DEFAULT_AUTO_GRANULARITY_MAX_BUCKETS,
isMetricChartConfig,
@ -29,7 +31,9 @@ import {
import { useClickhouseClient } from '@/clickhouse';
import { IS_MTVIEWS_ENABLED } from '@/config';
import { buildMTViewSelectQuery } from '@/hdxMTViews';
import { useMetadataWithSettings } from '@/hooks/useMetadata';
import { getMetadata } from '@/metadata';
import { useSource } from '@/source';
import { generateTimeWindowsDescending } from '@/utils/searchWindows';
interface AdditionalUseQueriedChartConfigOptions {
@ -128,12 +132,14 @@ async function* fetchDataInChunks({
signal,
enableQueryChunking = false,
enableParallelQueries = false,
metadata,
}: {
config: ChartConfigWithOptDateRange;
clickhouseClient: ClickhouseClient;
signal: AbortSignal;
enableQueryChunking?: boolean;
enableParallelQueries?: boolean;
metadata: Metadata;
}) {
const windows =
enableQueryChunking && shouldUseChunking(config)
@ -162,7 +168,7 @@ async function* fetchDataInChunks({
index,
queryResult: await clickhouseClient.queryChartConfig({
config: windowedConfig,
metadata: getMetadata(),
metadata,
opts: {
abort_signal: signal,
},
@ -203,7 +209,7 @@ async function* fetchDataInChunks({
const result = await clickhouseClient.queryChartConfig({
config: windowedConfig,
metadata: getMetadata(),
metadata,
opts: {
abort_signal: signal,
},
@ -248,8 +254,14 @@ export function useQueriedChartConfig(
options?: Partial<UseQueryOptions<TQueryFnData>> &
AdditionalUseQueriedChartConfigOptions,
) {
const { enabled = true } = options ?? {};
const clickhouseClient = useClickhouseClient();
const queryClient = useQueryClient();
const metadata = useMetadataWithSettings();
const { data: source, isLoading: isLoadingSource } = useSource({
id: config.source,
});
const query = useQuery<TQueryFnData, ClickHouseQueryError | Error>({
// Include enableQueryChunking in the query key to ensure that queries with the
@ -262,6 +274,16 @@ export function useQueriedChartConfig(
// TODO: Replace this with `streamedQuery` when it is no longer experimental. Use 'replace' refetch mode.
// https://tanstack.com/query/latest/docs/reference/streamedQuery
queryFn: async context => {
const optimizedConfig = source?.materializedViews?.length
? await tryOptimizeConfigWithMaterializedView(
config,
metadata,
clickhouseClient,
context.signal,
source,
)
: config;
const query = queryClient
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true });
@ -275,11 +297,12 @@ export function useQueriedChartConfig(
};
const chunks = fetchDataInChunks({
config,
config: optimizedConfig,
clickhouseClient,
signal: context.signal,
enableQueryChunking: options?.enableQueryChunking,
enableParallelQueries: options?.enableParallelQueries,
metadata,
});
let accumulatedChunks: TQueryFnData = emptyValue;
@ -311,25 +334,54 @@ export function useQueriedChartConfig(
retry: 1,
refetchOnWindowFocus: false,
...options,
enabled: enabled && !isLoadingSource,
});
if (query.isError && options?.onError) {
options.onError(query.error);
}
return query;
return {
...query,
isLoading: query.isLoading || isLoadingSource,
};
}
export function useRenderedSqlChartConfig(
config: ChartConfigWithOptDateRange,
options?: UseQueryOptions<string>,
) {
return useQuery<string>({
const { enabled = true } = options ?? {};
const metadata = useMetadataWithSettings();
const clickhouseClient = useClickhouseClient();
const { data: source, isLoading: isLoadingSource } = useSource({
id: config.source,
});
const query = useQuery({
queryKey: ['renderedSql', config],
queryFn: async () => {
const query = await renderChartConfig(config, getMetadata());
queryFn: async ({ signal }) => {
const optimizedConfig = source?.materializedViews?.length
? await tryOptimizeConfigWithMaterializedView(
config,
metadata,
clickhouseClient,
signal,
source,
)
: config;
const query = await renderChartConfig(optimizedConfig, getMetadata());
return format(parameterizedQueryToSql(query));
},
...options,
enabled: enabled && !isLoadingSource,
});
return {
...query,
isLoading: query.isLoading || isLoadingSource,
};
}
export function useAliasMapFromChartConfig(

View file

@ -0,0 +1,55 @@
import {
MVOptimizationExplanation,
tryOptimizeConfigWithMaterializedViewWithExplanations,
} from '@hyperdx/common-utils/dist/core/materializedViews';
import { ChartConfigWithOptDateRange } from '@hyperdx/common-utils/dist/types';
import {
keepPreviousData,
useQuery,
UseQueryOptions,
} from '@tanstack/react-query';
import { useClickhouseClient } from '@/clickhouse';
import { useSource } from '@/source';
import { useMetadataWithSettings } from './useMetadata';
export interface MVOptimizationExplanationResult {
optimizedConfig?: ChartConfigWithOptDateRange;
explanations: MVOptimizationExplanation[];
}
export function useMVOptimizationExplanation(
config: ChartConfigWithOptDateRange | undefined,
options?: UseQueryOptions<MVOptimizationExplanationResult>,
) {
const { enabled = true } = options || {};
const metadata = useMetadataWithSettings();
const clickhouseClient = useClickhouseClient();
const { data: source, isLoading: isLoadingSource } = useSource({
id: config?.source,
});
return useQuery<MVOptimizationExplanationResult>({
queryKey: ['optimizationExplanation', config],
queryFn: async ({ signal }) => {
if (!config || !source) {
return {
explanations: [],
};
}
return await tryOptimizeConfigWithMaterializedViewWithExplanations(
config,
metadata,
clickhouseClient,
signal,
source,
);
},
placeholderData: keepPreviousData,
...options,
enabled: enabled && !isLoadingSource && !!config && !!source,
});
}

View file

@ -6,12 +6,17 @@ import {
ClickHouseQueryError,
ColumnMetaType,
} from '@hyperdx/common-utils/dist/clickhouse';
import { tryOptimizeConfigWithMaterializedView } from '@hyperdx/common-utils/dist/core/materializedViews';
import { Metadata } from '@hyperdx/common-utils/dist/core/metadata';
import { renderChartConfig } from '@hyperdx/common-utils/dist/core/renderChartConfig';
import {
isFirstOrderByAscending,
isTimestampExpressionInFirstOrderBy,
} from '@hyperdx/common-utils/dist/core/utils';
import { ChartConfigWithOptTimestamp } from '@hyperdx/common-utils/dist/types';
import {
ChartConfigWithOptTimestamp,
TSource,
} from '@hyperdx/common-utils/dist/types';
import {
QueryClient,
QueryFunction,
@ -21,7 +26,8 @@ import {
import api from '@/api';
import { getClickhouseClient } from '@/clickhouse';
import { getMetadata } from '@/metadata';
import { useMetadataWithSettings } from '@/hooks/useMetadata';
import { useSource } from '@/source';
import { omit } from '@/utils';
import {
generateTimeWindowsAscending,
@ -34,6 +40,7 @@ type TQueryKey = readonly [
ChartConfigWithOptTimestamp,
number | undefined,
];
function queryKeyFn(
prefix: string,
config: ChartConfigWithOptTimestamp,
@ -59,6 +66,13 @@ type TData = {
pageParams: TPageParam[];
};
type QueryMeta = {
queryClient: QueryClient;
hasPreviousQueries: boolean;
metadata: Metadata;
source?: TSource;
};
// Get time window from page param
function getTimeWindowFromPageParam(
config: ChartConfigWithOptTimestamp,
@ -130,16 +144,29 @@ const queryFn: QueryFunction<TQueryFnData, TQueryKey, TPageParam> = async ({
if (meta == null) {
throw new Error('Query missing client meta');
}
const queryClient = meta.queryClient as QueryClient;
const { queryClient, metadata, hasPreviousQueries, source } =
meta as QueryMeta;
// Only stream incrementally if this is a fresh query with no previous
// response or if it's a paginated query
// otherwise we'll flicker the UI with streaming data
const isStreamingIncrementally =
!meta.hasPreviousQueries ||
pageParam.offset > 0 ||
pageParam.windowIndex > 0;
!hasPreviousQueries || pageParam.offset > 0 || pageParam.windowIndex > 0;
const config = queryKey[1];
const queryTimeout = queryKey[2];
const clickhouseClient = getClickhouseClient({ queryTimeout });
const rawConfig = queryKey[1];
const config = source?.materializedViews?.length
? await tryOptimizeConfigWithMaterializedView(
rawConfig,
metadata,
clickhouseClient,
signal,
source,
)
: rawConfig;
// Get the time window for this page
const shouldUseWindowing = isTimestampExpressionInFirstOrderBy(config);
@ -162,11 +189,7 @@ const queryFn: QueryFunction<TQueryFnData, TQueryKey, TPageParam> = async ({
},
};
const query = await renderChartConfig(windowedConfig, getMetadata());
const queryTimeout = queryKey[2];
// TODO: it seems like queryTimeout is not being honored, we should fix this
const clickhouseClient = getClickhouseClient({ queryTimeout });
const query = await renderChartConfig(windowedConfig, metadata);
// Create abort signal from timeout if provided
const abortController = queryTimeout ? new AbortController() : undefined;
@ -371,9 +394,10 @@ export default function useOffsetPaginatedQuery(
queryKeyPrefix?: string;
} = {},
) {
const { data: meData } = api.useMe();
const { data: meData, isLoading: isLoadingMe } = api.useMe();
const key = queryKeyFn(queryKeyPrefix, config, meData?.team?.queryTimeout);
const queryClient = useQueryClient();
const metadata = useMetadataWithSettings();
const matchedQueries = queryClient.getQueriesData<TData>({
queryKey: [queryKeyPrefix, omit(config, ['dateRange'])],
});
@ -381,6 +405,10 @@ export default function useOffsetPaginatedQuery(
const hasPreviousQueries =
matchedQueries.filter(([_, data]) => data != null).length > 0;
const { data: source, isLoading: isLoadingSource } = useSource({
id: config.source,
});
const {
data,
fetchNextPage,
@ -401,7 +429,7 @@ export default function useOffsetPaginatedQuery(
// Only preserve previous query in live mode
return isLive ? prev : undefined;
},
enabled,
enabled: enabled && !isLoadingMe && !isLoadingSource,
initialPageParam: { windowIndex: 0, offset: 0 } as TPageParam,
getNextPageParam: (lastPage, allPages) => {
return getNextPageParam(lastPage, allPages, config);
@ -410,7 +438,9 @@ export default function useOffsetPaginatedQuery(
meta: {
queryClient,
hasPreviousQueries,
},
metadata,
source,
} satisfies QueryMeta,
queryFn,
gcTime: isLive ? ms('30s') : ms('5m'), // more aggressive gc for live data, since it can end up holding lots of data
retry: 1,
@ -426,7 +456,7 @@ export default function useOffsetPaginatedQuery(
data: flattenedData,
fetchNextPage,
hasNextPage,
isFetching,
isLoading,
isFetching: isFetching || isLoadingMe || isLoadingSource,
isLoading: isLoading || isLoadingMe || isLoadingSource,
};
}

View file

@ -0,0 +1,312 @@
import {
extractColumnReferencesFromKey,
filterColumnMetaByType,
JSDataType,
} from '@hyperdx/common-utils/dist/clickhouse';
import {
TableConnection,
TableMetadata,
} from '@hyperdx/common-utils/dist/core/metadata';
import { splitAndTrimWithBracket } from '@hyperdx/common-utils/dist/core/utils';
import {
InternalAggregateFunction,
MaterializedViewConfiguration,
} from '@hyperdx/common-utils/dist/types';
import { getMetadata } from '@/metadata';
export const MV_AGGREGATE_FUNCTIONS = [
'avg',
'count',
'max',
'min',
'quantile',
'sum',
'histogram',
];
const MV_DDL_PATTERN = /MATERIALIZED VIEW [^\s]+\.[^\s]+ TO ([^\s]+)\.([^\s]+)/;
function getViewTargetTable(meta: TableMetadata) {
const match = meta.create_table_query.match(MV_DDL_PATTERN);
if (match && match[1] && match[2]) {
return {
databaseName: match[1],
tableName: match[2],
};
}
}
const isAggregateFn = (
aggFn: string | undefined,
): aggFn is InternalAggregateFunction => {
return MV_AGGREGATE_FUNCTIONS.includes(aggFn ?? '');
};
function isMaterializedView(meta: TableMetadata) {
return meta.engine?.startsWith('MaterializedView') ?? false;
}
function isAggregatingMergeTree(meta: TableMetadata) {
return meta.engine?.includes('AggregatingMergeTree') ?? false;
}
/**
* Given a table that is either a materialized view or a table targeted by a materialized view,
* fetches the metadata for both the materialized view and the target table.
*
* Returns undefined if there are multiple materialized views targeting the given table,
* or if the target table is not an AggregatingMergeTree.
*/
async function getMetadataForMaterializedViewAndTable({
databaseName,
tableName,
connectionId,
}: TableConnection) {
try {
const metadata = getMetadata();
const givenMetadata = await metadata.getTableMetadata({
databaseName,
tableName,
connectionId,
});
if (isMaterializedView(givenMetadata)) {
const mvMetadata = givenMetadata;
const mvTableDetails = getViewTargetTable(mvMetadata);
if (mvTableDetails) {
const mvTableMetadata = await metadata.getTableMetadata({
...mvTableDetails,
connectionId,
});
return isAggregatingMergeTree(mvTableMetadata)
? { mvMetadata, mvTableMetadata }
: undefined;
}
} else if (isAggregatingMergeTree(givenMetadata)) {
const mvTableMetadata = givenMetadata;
const sourceViews = await metadata.queryMaterializedViewsByTarget({
databaseName,
tableName,
connectionId,
});
if (sourceViews.length === 1) {
const mvMetadata = await metadata.getTableMetadata({
...sourceViews[0],
connectionId,
});
return {
mvMetadata,
mvTableMetadata,
};
} else {
// We can't be sure which materialized view to use, so
// just return the target table metadata
return { mvTableMetadata };
}
}
} catch (e) {
console.error('Error fetching materialized view metadata', e);
}
}
/**
* Split the given materialized view's SELECT expression into individual column expressions.
*/
function extractSelectExpressions(meta: TableMetadata) {
const selectStr = meta.as_select ?? '';
// Remove the "SELECT" keyword and everything after "FROM"
const selectExpressionWithoutSelect = selectStr
.slice(0, selectStr.toLowerCase().indexOf('from'))
.replace(/^select/i, '')
.trim();
// Split into individual expressions (eg. ['toStartOfMinute(Timestamp) AS Timestamp', ...])
return splitAndTrimWithBracket(selectExpressionWithoutSelect);
}
/**
* Returns the granularity of the given timestamp column, if it can be inferred
* by looking for a toStartOf(Second|Minute|Hour|Day) function in the given list
* of select column expressions.
*
* Returns undefined if the granularity cannot be inferred.
**/
export function inferTimestampColumnGranularity(
mvMetadata: TableMetadata,
timestampColumn: string,
) {
try {
// Find any expression that uses toStartOfX on the timestamp column
const selectExpressions = extractSelectExpressions(mvMetadata);
const timestampExpression = selectExpressions.find(
expr => expr.match(/toStartOf|toDate/) && expr.includes(timestampColumn),
);
if (!timestampExpression) {
return undefined;
}
// Look for fixed interval functions
const intervalToGranularityMap: Record<string, string> = {
toStartOfSecond: '1 second',
toStartOfMinute: '1 minute',
toStartOfFiveMinutes: '5 minute',
toStartOfFifteenMinutes: '15 minute',
toStartOfHour: '1 hour',
toStartOfDay: '1 day',
toDate: '1 day',
toDateTime: '1 second',
};
for (const [func, granularity] of Object.entries(
intervalToGranularityMap,
)) {
if (timestampExpression?.includes(`${func}(`)) {
return granularity;
}
}
// Look for toStartOfInterval(Timestamp, INTERVAL X UNIT)
// Only accept specific granularities matching the ones defined above
if (timestampExpression.includes(`toStartOfInterval(`)) {
const intervalMatch = timestampExpression.match(
/INTERVAL\s+(\d+)\s+(SECOND|MINUTE|HOUR|DAY)\)/i,
);
const granularity = intervalMatch
? `${intervalMatch[1]} ${intervalMatch[2].toLowerCase()}`
: '';
if (
granularity &&
Object.values(intervalToGranularityMap).includes(granularity)
) {
return granularity;
}
}
} catch (e) {
console.error('Error inferring timestamp column granularity', e);
}
}
/**
* Attempts to a MaterializedViewConfiguration object from the given TableConnections
* by introspecting the view, target table, and source table.
*
* @param mvTableOrView - A TableConnection representing either the materialized view or the target table.
* @param sourceTable - A TableConnection representing the source table (the table the materialized view selects from).
*
* Returns undefined if the configuration cannot be inferred.
*/
export async function inferMaterializedViewConfig(
mvTableOrView: TableConnection,
sourceTable: TableConnection,
): Promise<MaterializedViewConfiguration | undefined> {
const { databaseName, tableName, connectionId } = mvTableOrView;
const { databaseName: sourceDatabaseName, tableName: sourceTableName } =
sourceTable;
if (!tableName) {
return undefined;
}
const meta = await getMetadataForMaterializedViewAndTable({
databaseName,
tableName,
connectionId,
});
if (!meta) {
return undefined;
}
const { mvMetadata, mvTableMetadata } = meta;
const metadata = getMetadata();
const [mvTableColumns, sourceTableColumns] = await Promise.all([
metadata.getColumns({
databaseName: mvTableMetadata.database,
tableName: mvTableMetadata.name,
connectionId,
}),
metadata.getColumns({
databaseName: sourceDatabaseName,
tableName: sourceTableName,
connectionId,
}),
]);
const sourceTableColumnNames = new Set(
sourceTableColumns.map(col => col.name),
);
const aggregatedColumns: MaterializedViewConfiguration['aggregatedColumns'] =
mvTableColumns
.filter(col => col.type.includes('AggregateFunction'))
.map(col => {
let aggFn: string | undefined = col.type.match(
/AggregateFunction\(([a-zA-Z0-9_]+)/,
)?.[1];
if (aggFn === 'sum' && col.name.toLowerCase().includes('count')) {
aggFn = 'count';
} else if (aggFn?.startsWith('quantile')) {
aggFn = 'quantile';
}
if (!isAggregateFn(aggFn)) {
return undefined;
}
// Convention: MV Columns are named "<aggFn>__<sourceColumn>"
const nameSuffix = col.name.split('__')[1];
const sourceColumn =
sourceTableColumnNames.has(nameSuffix) && aggFn !== 'count'
? nameSuffix
: '';
return {
mvColumn: col.name,
aggFn,
sourceColumn,
};
})
.filter(c => c != undefined);
// Infer the timestamp column
const primaryKeyColumns = new Set(
extractColumnReferencesFromKey(mvTableMetadata.primary_key),
);
const timestampColumns =
filterColumnMetaByType(mvTableColumns, [JSDataType.Date]) ?? [];
const timestampColumn =
timestampColumns?.find(c => primaryKeyColumns.has(c.name))?.name ?? '';
// Infer the granularity, if possible
let minGranularity = '';
if (mvMetadata) {
minGranularity =
inferTimestampColumnGranularity(mvMetadata, timestampColumn) ?? '';
}
// Infer the dimension columns
const dimensionColumns = mvTableColumns
.filter(
col =>
!col.type.includes('AggregateFunction') &&
!timestampColumns.includes(col),
)
.map(col => col.name)
.join(', ');
return {
databaseName: mvTableMetadata.database,
tableName: mvTableMetadata.name,
dimensionColumns,
minGranularity,
timestampColumn,
aggregatedColumns,
};
}

View file

@ -1,5 +1,13 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`renderChartConfig Aggregate Merge Functions should generate SQL for an aggregate merge function 1`] = `"SELECT avgMerge(Duration),severity FROM default.logs WHERE (timestamp >= fromUnixTimestamp64Milli(1739318400000) AND timestamp <= fromUnixTimestamp64Milli(1739491200000)) GROUP BY severity"`;
exports[`renderChartConfig Aggregate Merge Functions should generate SQL for an aggregate merge function with a condition 1`] = `"SELECT avgMergeIf(Duration, ((severity = 'ERROR')) AND toFloat64OrDefault(toString(Duration)) IS NOT NULL),severity FROM default.logs WHERE (((severity = 'ERROR'))) GROUP BY severity"`;
exports[`renderChartConfig Aggregate Merge Functions should generate SQL for an histogram merge function 1`] = `"SELECT histogramMerge(20)(Duration),severity FROM default.logs WHERE (timestamp >= fromUnixTimestamp64Milli(1739318400000) AND timestamp <= fromUnixTimestamp64Milli(1739491200000)) GROUP BY severity"`;
exports[`renderChartConfig Aggregate Merge Functions should generate SQL for an quantile merge function with a condition 1`] = `"SELECT quantileMergeIf(0.95)(Duration, ((severity = 'ERROR')) AND toFloat64OrDefault(toString(Duration)) IS NOT NULL),severity FROM default.logs WHERE (timestamp >= fromUnixTimestamp64Milli(1739318400000) AND timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (((severity = 'ERROR'))) GROUP BY severity"`;
exports[`renderChartConfig HAVING clause should not render HAVING clause when having is empty string 1`] = `"SELECT count(),severity FROM default.logs WHERE (timestamp >= fromUnixTimestamp64Milli(1739318400000) AND timestamp <= fromUnixTimestamp64Milli(1739491200000)) GROUP BY severity"`;
exports[`renderChartConfig HAVING clause should not render HAVING clause when not provided 1`] = `"SELECT count(),severity FROM default.logs WHERE (timestamp >= fromUnixTimestamp64Milli(1739318400000) AND timestamp <= fromUnixTimestamp64Milli(1739491200000)) GROUP BY severity"`;

View file

@ -12,13 +12,23 @@ describe('renderChartConfig', () => {
let mockMetadata: jest.Mocked<Metadata>;
beforeEach(() => {
const columns = [
{ name: 'timestamp', type: 'DateTime' },
{ name: 'value', type: 'Float64' },
{ name: 'TraceId', type: 'String' },
{ name: 'ServiceName', type: 'String' },
];
mockMetadata = {
getColumns: jest.fn().mockResolvedValue([
{ name: 'timestamp', type: 'DateTime' },
{ name: 'value', type: 'Float64' },
]),
getMaterializedColumnsLookupTable: jest.fn().mockResolvedValue(null),
getColumn: jest.fn().mockResolvedValue({ type: 'DateTime' }),
getColumn: jest
.fn()
.mockImplementation(async ({ column }) =>
columns.find(col => col.name === column),
),
getTableMetadata: jest
.fn()
.mockResolvedValue({ primary_key: 'timestamp' }),
@ -1005,4 +1015,122 @@ describe('renderChartConfig', () => {
},
);
});
describe('Aggregate Merge Functions', () => {
it('should generate SQL for an aggregate merge function', async () => {
const config: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: {
databaseName: 'default',
tableName: 'logs',
},
select: [
{
aggFn: 'avgMerge',
valueExpression: 'Duration',
},
],
where: '',
whereLanguage: 'sql',
groupBy: 'severity',
timestampValueExpression: 'timestamp',
dateRange: [new Date('2025-02-12'), new Date('2025-02-14')],
};
const generatedSql = await renderChartConfig(config, mockMetadata);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('avgMerge(Duration)');
expect(actual).toMatchSnapshot();
});
it('should generate SQL for an aggregate merge function with a condition', async () => {
const config: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: {
databaseName: 'default',
tableName: 'logs',
},
select: [
{
aggFn: 'avgMerge',
valueExpression: 'Duration',
aggCondition: 'severity:"ERROR"',
aggConditionLanguage: 'lucene',
},
],
where: '',
whereLanguage: 'sql',
groupBy: 'severity',
};
const generatedSql = await renderChartConfig(config, mockMetadata);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
"avgMergeIf(Duration, ((severity = 'ERROR')) AND toFloat64OrDefault(toString(Duration)) IS NOT NULL)",
);
expect(actual).toMatchSnapshot();
});
it('should generate SQL for an quantile merge function with a condition', async () => {
const config: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: {
databaseName: 'default',
tableName: 'logs',
},
select: [
{
aggFn: 'quantileMerge',
aggCondition: 'severity:"ERROR"',
aggConditionLanguage: 'lucene',
valueExpression: 'Duration',
level: 0.95,
},
],
where: '',
whereLanguage: 'sql',
groupBy: 'severity',
timestampValueExpression: 'timestamp',
dateRange: [new Date('2025-02-12'), new Date('2025-02-14')],
};
const generatedSql = await renderChartConfig(config, mockMetadata);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
"quantileMergeIf(0.95)(Duration, ((severity = 'ERROR')) AND toFloat64OrDefault(toString(Duration)) IS NOT NULL)",
);
expect(actual).toMatchSnapshot();
});
it('should generate SQL for an histogram merge function', async () => {
const config: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: {
databaseName: 'default',
tableName: 'logs',
},
select: [
{
aggFn: 'histogramMerge',
valueExpression: 'Duration',
level: 20,
},
],
where: '',
whereLanguage: 'sql',
groupBy: 'severity',
timestampValueExpression: 'timestamp',
dateRange: [new Date('2025-02-12'), new Date('2025-02-14')],
};
const generatedSql = await renderChartConfig(config, mockMetadata);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('histogramMerge(20)(Duration)');
expect(actual).toMatchSnapshot();
});
});
});

File diff suppressed because it is too large Load diff

View file

@ -646,6 +646,52 @@ export abstract class BaseClickhouseClient {
}
throw new Error('No result sets');
}
/**
* Checks whether the given chart config is valid by running an
* EXPLAIN query and returning whether the EXPLAIN succeeded
**/
async testChartConfigValidity({
config,
metadata,
opts,
}: {
config: ChartConfigWithOptDateRange;
metadata: Metadata;
opts?: {
abort_signal?: AbortSignal;
clickhouse_settings?: Record<string, any>;
};
}): Promise<{ isValid: boolean; rowEstimate?: number; error?: string }> {
try {
const renderedConfig = await renderChartConfig(config, metadata);
const explainedQuery = chSql`EXPLAIN ESTIMATE ${renderedConfig}`;
const result = await this.query<'JSON'>({
query: explainedQuery.sql,
query_params: explainedQuery.params,
format: 'JSON',
abort_signal: opts?.abort_signal,
connectionId: config.connection,
clickhouse_settings: opts?.clickhouse_settings,
});
const jsonResult = await result.json<{ rows: string | number }>();
const rowEstimate = Number(jsonResult.data[0]?.rows);
return {
isValid: true,
rowEstimate: Number.isNaN(rowEstimate) ? undefined : rowEstimate,
};
} catch (error: ClickHouseQueryError | unknown) {
return {
isValid: false,
error:
error instanceof ClickHouseQueryError
? error.message
: String('Error while constructing materialized view query'),
};
}
}
}
export const tableExpr = ({

View file

@ -0,0 +1,520 @@
import { BaseClickhouseClient } from '@/clickhouse';
import {
ChartConfigWithOptDateRange,
CteChartConfig,
InternalAggregateFunction,
InternalAggregateFunctionSchema,
MaterializedViewConfiguration,
TSource,
} from '@/types';
import { Metadata, TableConnection } from './metadata';
import { DEFAULT_AUTO_GRANULARITY_MAX_BUCKETS } from './renderChartConfig';
import {
convertDateRangeToGranularityString,
convertGranularityToSeconds,
} from './utils';
type SelectItem = Exclude<
ChartConfigWithOptDateRange['select'],
string
>[number];
async function isSimpleAggregateFunction(
tableConnection: TableConnection,
column: string,
metadata: Metadata,
) {
try {
const columnMeta = await metadata.getColumn({
...tableConnection,
column,
});
return !!columnMeta?.type.startsWith('SimpleAggregateFunction(');
} catch {
return false;
}
}
// Variants of quantile (ex. quantileExact, quantileDD, etc.)
async function getQuantileAggregateFunction(
tableConnection: TableConnection,
column: string,
metadata: Metadata,
) {
try {
const columnMeta = await metadata.getColumn({
...tableConnection,
column,
});
const type = columnMeta?.type;
if (!type) {
return undefined;
}
// Use regex to extract the quantile function name inside AggregateFunction(...)
// For example, AggregateFunction(quantile(0.95), Int64) --> quantile
// AggregateFunction(quantileTDigest(0.95), Int64) --> quantileTDigest
// AggregateFunction(quantileDD(0.001, 0.95), Int64) --> quantileDD
const match = type.match(/^AggregateFunction\(\s*([^(, ]+)\s*\(/);
return match?.[1];
} catch {
return undefined;
}
}
async function getAggregateMergeFunction(
tableConnection: TableConnection,
column: string,
aggFn: string,
metadata: Metadata,
) {
if (aggFn === 'count') {
// Counts are stored in AggregatingMergeTree as UInt64 or SimpleAggregateFunction(sum, UInt64),
// both of which should be summed rather than count()'ed.
return 'sum';
} else if (
await isSimpleAggregateFunction(tableConnection, column, metadata)
) {
return aggFn;
} else {
return `${aggFn}Merge`;
}
}
function isValidAggFn(
aggFn: string | undefined,
): aggFn is InternalAggregateFunction {
return !!aggFn && InternalAggregateFunctionSchema.safeParse(aggFn).success;
}
function isQuantileSelectItem(item: SelectItem): item is {
valueExpression: string;
aggFn: 'quantile';
level: number;
} {
return (
item.aggFn === 'quantile' &&
'level' in item &&
typeof item.level === 'number'
);
}
function getAggregatedColumnConfig(
mvConfig: MaterializedViewConfiguration,
column: string,
aggFn: InternalAggregateFunction,
) {
return mvConfig.aggregatedColumns.find(
config =>
config.aggFn === aggFn &&
(config.aggFn === 'count' || config.sourceColumn === column),
);
}
/**
* Indicates whether the MV described by mvConfig is capable of
* supporting the granularity requested in the given chart config.
**/
function mvConfigSupportsGranularity(
mvConfig: MaterializedViewConfiguration,
chartConfig: ChartConfigWithOptDateRange,
): boolean {
if (!chartConfig.granularity && !chartConfig.dateRange) {
return true;
}
// If granularity is not provided at all, but we have a date range, we need a way to
// determine if the MV granularity is sufficient for the date range. So we'll assume
// an 'auto' granularity and check that against the MV.
const normalizedGranularity = chartConfig.granularity || 'auto';
// 'auto' granularity requires a date range to determine effective granularity
if (normalizedGranularity === 'auto' && !chartConfig.dateRange) {
return false;
}
// Determine the effective granularity if the granularity is 'auto'
const chartGranularity =
normalizedGranularity === 'auto' && chartConfig.dateRange
? convertDateRangeToGranularityString(
chartConfig.dateRange,
DEFAULT_AUTO_GRANULARITY_MAX_BUCKETS,
)
: normalizedGranularity;
const chartGranularitySeconds = convertGranularityToSeconds(chartGranularity);
const mvGranularitySeconds = convertGranularityToSeconds(
mvConfig.minGranularity,
);
return chartGranularitySeconds >= mvGranularitySeconds;
}
const COUNT_FUNCTION_PATTERN = /\bcount(If)?\s*\(/i;
export function isUnsupportedCountFunction(selectItem: SelectItem): boolean {
return COUNT_FUNCTION_PATTERN.test(selectItem.valueExpression);
}
async function convertSelectToMaterializedViewSelect(
mvConfig: MaterializedViewConfiguration,
selectItem: SelectItem,
mvTableConnection: TableConnection,
metadata: Metadata,
): Promise<SelectItem> {
const { valueExpression, aggFn: initialAggFn } = selectItem;
// can be modified later for quantile
let aggFn = initialAggFn;
// Custom count() expressions are not yet optimizable, but they also won't fail the
// EXPLAIN check - instead they'll just return an incorrect result.
if (isUnsupportedCountFunction(selectItem)) {
throw new Error(
`Custom count() expressions are not supported with materialized views.`,
);
}
if (!aggFn) {
return selectItem;
}
if (!isValidAggFn(aggFn)) {
throw new Error(`Aggregate function ${aggFn} is not valid.`);
}
// Handle aggregations without a value expression (eg. count)
// NOTE: such aggregations may still have a valueExpression in the selectItem,
// but it should be ignored
const columnConfigNoSourceColumn = getAggregatedColumnConfig(
mvConfig,
'',
aggFn,
);
if (columnConfigNoSourceColumn) {
const targetColumn = columnConfigNoSourceColumn.mvColumn;
const aggMergeFn = await getAggregateMergeFunction(
mvTableConnection,
targetColumn,
aggFn,
metadata,
);
return {
...selectItem,
valueExpression: targetColumn,
aggFn: aggMergeFn,
};
}
const aggregatedColumnConfig = getAggregatedColumnConfig(
mvConfig,
valueExpression,
aggFn,
);
if (!aggregatedColumnConfig) {
throw new Error(
`The aggregate function ${formatAggregateFunction(aggFn, selectItem['level'])} is not available for column '${valueExpression}'.`,
);
}
if (isQuantileSelectItem(selectItem)) {
const quantileAggregateFunction = await getQuantileAggregateFunction(
mvTableConnection,
aggregatedColumnConfig.mvColumn,
metadata,
);
if (quantileAggregateFunction) {
aggFn = quantileAggregateFunction;
}
}
const aggMergeFn = await getAggregateMergeFunction(
mvTableConnection,
aggregatedColumnConfig.mvColumn,
aggFn,
metadata,
);
return {
...selectItem,
valueExpression: aggregatedColumnConfig.mvColumn,
aggFn: aggMergeFn,
};
}
export type MVOptimizationExplanation = {
success: boolean;
errors: string[];
rowEstimate?: number;
mvConfig: MaterializedViewConfiguration;
};
export async function tryConvertConfigToMaterializedViewSelect<
C extends ChartConfigWithOptDateRange | CteChartConfig,
>(
chartConfig: C,
mvConfig: MaterializedViewConfiguration,
metadata: Metadata,
): Promise<{
optimizedConfig?: C;
errors?: string[];
}> {
if (!Array.isArray(chartConfig.select)) {
return {
errors: ['Only array-based select statements are supported.'],
};
}
if (!mvConfigSupportsGranularity(mvConfig, chartConfig)) {
const error = chartConfig.granularity
? `Granularity must be at least ${mvConfig.minGranularity}.`
: 'The selected date range is too short for the granularity of this materialized view.';
return { errors: [error] };
}
const mvTableConnection: TableConnection = {
databaseName: mvConfig.databaseName,
tableName: mvConfig.tableName,
connectionId: chartConfig.connection,
};
const conversions = await Promise.allSettled(
chartConfig.select.map(selectItem =>
convertSelectToMaterializedViewSelect(
mvConfig,
selectItem,
mvTableConnection,
metadata,
),
),
);
const select: SelectItem[] = [];
const errors: string[] = [];
for (const result of conversions) {
if (result.status === 'rejected') {
errors.push(result.reason.message);
} else {
select.push(result.value);
}
}
if (errors.length > 0) {
return {
errors,
};
}
const clonedConfig = {
...structuredClone(chartConfig),
select,
from: {
databaseName: mvConfig.databaseName,
tableName: mvConfig.tableName,
},
};
return {
optimizedConfig: clonedConfig,
};
}
/** Attempts to optimize a config with a single MV Config */
async function tryOptimizeConfig<C extends ChartConfigWithOptDateRange>(
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
mvConfig: MaterializedViewConfiguration,
sourceFrom: TSource['from'],
) {
const errors: string[] = [];
// Attempt to optimize any CTEs that exist in the config
let optimizedConfig: C | undefined = undefined;
if (config.with) {
const cteOptimizationResults = await Promise.all(
config.with.map(async cte => {
if (
cte.chartConfig &&
cte.chartConfig.from.databaseName === sourceFrom.databaseName &&
cte.chartConfig.from.tableName === sourceFrom.tableName
) {
return tryConvertConfigToMaterializedViewSelect(
cte.chartConfig,
mvConfig,
metadata,
);
} else {
return {
optimizedConfig: undefined,
errors: [],
};
}
}),
);
const hasOptimizedCTEs = cteOptimizationResults.some(
r => !!r.optimizedConfig,
);
if (hasOptimizedCTEs) {
optimizedConfig = {
...structuredClone(config),
with: config.with.map((originalCte, index) => {
return {
...originalCte,
chartConfig:
cteOptimizationResults[index].optimizedConfig ??
originalCte.chartConfig,
};
}),
};
}
errors.push(...cteOptimizationResults.flatMap(r => r.errors ?? []));
}
// Attempt to optimize the main (outer) select
if (
config.from.databaseName === sourceFrom.databaseName &&
config.from.tableName === sourceFrom.tableName
) {
const convertedOuterSelect = await tryConvertConfigToMaterializedViewSelect(
optimizedConfig ?? config,
mvConfig,
metadata,
);
if (convertedOuterSelect.optimizedConfig) {
optimizedConfig = convertedOuterSelect.optimizedConfig;
}
errors.push(...(convertedOuterSelect.errors ?? []));
}
// If the config has been optimized, validate it by checking whether an EXPLAIN query succeeds
if (optimizedConfig) {
const {
isValid,
rowEstimate = Number.POSITIVE_INFINITY,
error,
} = await clickhouseClient.testChartConfigValidity({
config: optimizedConfig,
metadata,
opts: {
abort_signal: signal,
},
});
if (error) {
errors.push(error);
}
if (isValid) {
return {
optimizedConfig,
rowEstimate,
errors: [],
};
}
}
return { errors };
}
/** Attempts to optimize a config with each of the provided MV Configs */
export async function tryOptimizeConfigWithMaterializedViewWithExplanations<
C extends ChartConfigWithOptDateRange,
>(
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
source: Pick<TSource, 'from'> & Partial<Pick<TSource, 'materializedViews'>>,
): Promise<{
optimizedConfig?: C;
explanations: MVOptimizationExplanation[];
}> {
const mvConfigs = source.materializedViews ?? [];
const optimizationResults = await Promise.all(
mvConfigs.map(mvConfig =>
tryOptimizeConfig(
config,
metadata,
clickhouseClient,
signal,
mvConfig,
source.from,
).then(result => ({ ...result, mvConfig })),
),
);
// Find a config with the lowest row estimate among successfully optimized configs
let resultOptimizedConfig: C | undefined = undefined;
let minRowEstimate = Number.POSITIVE_INFINITY;
for (const result of optimizationResults) {
if (
result.optimizedConfig &&
(result.rowEstimate ?? Number.POSITIVE_INFINITY) < minRowEstimate
) {
resultOptimizedConfig = result.optimizedConfig;
minRowEstimate = result.rowEstimate ?? Number.POSITIVE_INFINITY;
}
}
const explanations = optimizationResults.map(
({ optimizedConfig, errors, rowEstimate, mvConfig }) => ({
success: !!optimizedConfig && optimizedConfig === resultOptimizedConfig,
errors,
rowEstimate,
mvConfig,
}),
);
return {
optimizedConfig: resultOptimizedConfig,
explanations,
};
}
export async function tryOptimizeConfigWithMaterializedView<
C extends ChartConfigWithOptDateRange,
>(
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
source: Pick<TSource, 'from'> & Partial<Pick<TSource, 'materializedViews'>>,
) {
const { optimizedConfig } =
await tryOptimizeConfigWithMaterializedViewWithExplanations(
config,
metadata,
clickhouseClient,
signal,
source,
);
return optimizedConfig ?? config;
}
function formatAggregateFunction(aggFn: string, level: number | undefined) {
if (aggFn === 'quantile') {
switch (level) {
case 0.5:
return 'median';
case 0.9:
return 'p90';
case 0.95:
return 'p95';
case 0.99:
return 'p99';
default:
return `quantile`;
}
} else {
return aggFn;
}
}

View file

@ -146,6 +146,34 @@ export class Metadata {
);
}
/** Queries and returns the list of materialized views which insert into the given target table */
async queryMaterializedViewsByTarget({
databaseName,
tableName,
connectionId,
}: TableConnection) {
return this.cache.getOrFetch(
`${connectionId}.${databaseName}.${tableName}.sourceMaterializedViews`,
async () => {
const toDatabaseTable = `%TO ${databaseName}.${tableName}%`;
const sql = chSql`
SELECT database as databaseName, name as tableName
FROM system.tables
WHERE engine = 'MaterializedView'
AND create_table_query LIKE ${{ String: toDatabaseTable }}`;
const json = await this.clickhouseClient
.query<'JSON'>({
connectionId,
query: sql.sql,
query_params: sql.params,
clickhouse_settings: this.getClickHouseSettings(),
})
.then(res => res.json<{ databaseName: string; tableName: string }>());
return json.data;
},
);
}
async getColumns({
databaseName,
tableName,

View file

@ -284,12 +284,12 @@ const fastifySQL = ({
const aggFnExpr = ({
fn,
expr,
quantileLevel,
level,
where,
}: {
fn: AggregateFunction | AggregateFunctionWithCombinators;
expr?: string;
quantileLevel?: number;
level?: number;
where?: string;
}) => {
const isAny = fn === 'any';
@ -304,9 +304,21 @@ const aggFnExpr = ({
const whereWithExtraNullCheck = `${where} AND ${unsafeExpr.UNSAFE_RAW_SQL} IS NOT NULL`;
if (fn.endsWith('Merge')) {
return chSql`${fn}(${{
UNSAFE_RAW_SQL: expr ?? '',
}})`;
const renderedFnArgs = chSql`${{ UNSAFE_RAW_SQL: expr ?? '' }}`;
const shouldParameterizeWithLevel =
level && (fn.startsWith('quantile') || fn.startsWith('histogram'));
const renderedFnArgsWithQuantileLevel = shouldParameterizeWithLevel
? chSql`(${{
UNSAFE_RAW_SQL: Number.isFinite(level) ? `${level}` : '0',
}})`
: [];
if (isWhereUsed) {
return chSql`${fn}If${renderedFnArgsWithQuantileLevel}(${renderedFnArgs}, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }})`;
} else {
return chSql`${fn}${renderedFnArgsWithQuantileLevel}(${renderedFnArgs})`;
}
}
// TODO: merge this chunk with the rest of logics
else if (fn.endsWith('State')) {
@ -342,13 +354,11 @@ const aggFnExpr = ({
}}${isWhereUsed ? chSql`, ${{ UNSAFE_RAW_SQL: where }}` : ''})`;
}
if (quantileLevel != null) {
return chSql`quantile${isWhereUsed ? 'If' : ''}(${{
if (level != null) {
return chSql`${fn}${isWhereUsed ? 'If' : ''}(${{
// Using Float64 param leads to an added coersion, but we don't need to
// escape number values anyways
UNSAFE_RAW_SQL: Number.isFinite(quantileLevel)
? `${quantileLevel}`
: '0',
UNSAFE_RAW_SQL: Number.isFinite(level) ? `${level}` : '0',
}})(${unsafeExpr}${
isWhereUsed
? chSql`, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }}`
@ -424,12 +434,15 @@ async function renderSelectList(
with: chartConfig.with,
})
: chSql`${{ UNSAFE_RAW_SQL: select.valueExpression }}`;
} else if (select.aggFn === 'quantile') {
} else if (
select.aggFn.startsWith('quantile') ||
select.aggFn.startsWith('histogram')
) {
expr = aggFnExpr({
fn: select.aggFn,
expr: select.valueExpression,
// @ts-ignore (TS doesn't know that we've already checked for quantile)
quantileLevel: select.level,
// @ts-expect-error (TS doesn't know that we've already checked for quantile)
level: select.level,
where: whereClause.sql,
});
} else {

View file

@ -64,6 +64,11 @@ export const AggregateFunctionSchema = z.enum([
'any',
'none',
]);
export const InternalAggregateFunctionSchema = z.enum([
...AggregateFunctionSchema.options,
// Not exposed to the user directly, but used in pre-built dashboards
'histogram',
]);
export const AggregateFunctionWithCombinatorsSchema = z
.string()
.regex(/^(\w+)If(State|Merge)$/);
@ -82,7 +87,12 @@ export const RootValueExpressionSchema = z
})
.or(
z.object({
aggFn: z.literal('quantile'),
aggFn: z.union([
z.literal('quantile'),
z.literal('quantileMerge'),
z.literal('histogram'),
z.literal('histogramMerge'),
]),
level: z.number(),
aggCondition: SearchConditionSchema,
aggConditionLanguage: SearchConditionLanguageSchema,
@ -165,6 +175,9 @@ export type SearchConditionLanguage = z.infer<
typeof SearchConditionLanguageSchema
>;
export type AggregateFunction = z.infer<typeof AggregateFunctionSchema>;
export type InternalAggregateFunction = z.infer<
typeof InternalAggregateFunctionSchema
>;
export type AggregateFunctionWithCombinators = z.infer<
typeof AggregateFunctionWithCombinatorsSchema
>;
@ -390,6 +403,7 @@ export const _ChartConfigSchema = z.object({
// Used to preserve original table select string when chart overrides it (e.g., histograms)
eventTableSelect: z.string().optional(),
compareToPreviousPeriod: z.boolean().optional(),
source: z.string().optional(),
});
// This is a ChartConfig type without the `with` CTE clause included.
@ -469,6 +483,7 @@ export const SavedChartConfigSchema = z
_ChartConfigSchema.omit({
connection: true,
timestampValueExpression: true,
source: true, // Omit the optional source here since it's required above
}).shape,
)
.extend(
@ -593,6 +608,38 @@ const HighlightedAttributeExpressionsSchema = z.array(
}),
);
const AggregatedColumnConfigSchema = z
.object({
sourceColumn: z.string().optional(),
aggFn: InternalAggregateFunctionSchema,
mvColumn: z.string().min(1, 'Materialized View Column is required'),
})
.refine(
({ sourceColumn, aggFn }) => aggFn === 'count' || !!sourceColumn?.length,
{ message: 'Materialized View Source Column is required' },
);
export type AggregatedColumnConfig = z.infer<
typeof AggregatedColumnConfigSchema
>;
export const MaterializedViewConfigurationSchema = z.object({
databaseName: z.string().min(1, 'Materialized View Database is required'),
tableName: z.string().min(1, 'Materialized View Table is required'),
dimensionColumns: z.string(),
minGranularity: SQLIntervalSchema,
timestampColumn: z
.string()
.min(1, 'Materialized View Timestamp column is required'),
aggregatedColumns: z
.array(AggregatedColumnConfigSchema)
.min(1, 'At least one aggregated column is required'),
});
export type MaterializedViewConfiguration = z.infer<
typeof MaterializedViewConfigurationSchema
>;
// Log source form schema
const LogSourceAugmentation = {
kind: z.literal(SourceKind.Log),
@ -619,6 +666,7 @@ const LogSourceAugmentation = {
HighlightedAttributeExpressionsSchema.optional(),
highlightedRowAttributeExpressions:
HighlightedAttributeExpressionsSchema.optional(),
materializedViews: z.array(MaterializedViewConfigurationSchema).optional(),
};
// Trace source form schema
@ -653,6 +701,7 @@ const TraceSourceAugmentation = {
HighlightedAttributeExpressionsSchema.optional(),
highlightedRowAttributeExpressions:
HighlightedAttributeExpressionsSchema.optional(),
materializedViews: z.array(MaterializedViewConfigurationSchema).optional(),
};
// Session source form schema