feat: Auto-link correlated sources bidirectionally (#990)

Automatically maintain bidirectional relationships between correlated sources. When a user selects a correlated source (e.g., Log → Metric), the target source is updated to link back (Metric → Log) if not already linked.

- Works for both new and existing sources
- Preserves existing correlations (no overwriting)
- Improves data consistency across the application
This commit is contained in:
Dan Hable 2025-07-24 10:41:56 -05:00 committed by GitHub
parent 4ce81d42b0
commit bb37520541
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 361 additions and 4 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/app": minor
---
Correlated source field links are bidirectional by default and no link exists.

View file

@ -38,6 +38,7 @@ import {
useCreateSource,
useDeleteSource,
useSource,
useSources,
useUpdateSource,
} from '@/source';
@ -56,6 +57,39 @@ const OTEL_CLICKHOUSE_EXPRESSIONS = {
resourceAttributesExpression: 'ResourceAttributes',
};
const CORRELATION_FIELD_MAP: Record<
SourceKind,
Record<string, { targetKind: SourceKind; targetField: keyof TSource }[]>
> = {
[SourceKind.Log]: {
metricSourceId: [
{ targetKind: SourceKind.Metric, targetField: 'logSourceId' },
],
traceSourceId: [
{ targetKind: SourceKind.Trace, targetField: 'logSourceId' },
],
},
[SourceKind.Trace]: {
logSourceId: [{ targetKind: SourceKind.Log, targetField: 'traceSourceId' }],
sessionSourceId: [
{ targetKind: SourceKind.Session, targetField: 'traceSourceId' },
],
metricSourceId: [
{ targetKind: SourceKind.Metric, targetField: 'logSourceId' },
],
},
[SourceKind.Session]: {
traceSourceId: [
{ targetKind: SourceKind.Trace, targetField: 'sessionSourceId' },
],
},
[SourceKind.Metric]: {
logSourceId: [
{ targetKind: SourceKind.Log, targetField: 'metricSourceId' },
],
},
};
function FormRow({
label,
children,
@ -906,6 +940,62 @@ export function TableSourceForm({
const updateSource = useUpdateSource();
const deleteSource = useDeleteSource();
// Bidirectional source linking
const { data: sources } = useSources();
const currentSourceId = watch('id');
useEffect(() => {
const { unsubscribe } = watch(async (_value, { name, type }) => {
const value = _value as TSourceUnion;
if (!currentSourceId || !sources || type !== 'change') return;
const correlationFields = CORRELATION_FIELD_MAP[kind];
if (!correlationFields || !name || !(name in correlationFields)) return;
const fieldName = name as keyof TSourceUnion;
const newTargetSourceId = value[fieldName] as string | undefined;
const targetConfigs = correlationFields[fieldName];
for (const { targetKind, targetField } of targetConfigs) {
// Find the previously linked source if any
const previouslyLinkedSource = sources.find(
s => s.kind === targetKind && s[targetField] === currentSourceId,
);
// If there was a previously linked source and it's different from the new one, unlink it
if (
previouslyLinkedSource &&
previouslyLinkedSource.id !== newTargetSourceId
) {
await updateSource.mutateAsync({
source: {
...previouslyLinkedSource,
[targetField]: undefined,
} as TSource,
});
}
// If a new source is selected, link it back
if (newTargetSourceId) {
const targetSource = sources.find(s => s.id === newTargetSourceId);
if (targetSource && targetSource.kind === targetKind) {
// Only update if the target field is empty to avoid overwriting existing correlations
if (!targetSource[targetField]) {
await updateSource.mutateAsync({
source: {
...targetSource,
[targetField]: currentSourceId,
} as TSource,
});
}
}
}
}
});
return () => unsubscribe();
}, [watch, kind, currentSourceId, sources, updateSource]);
const sourceFormSchema = sourceSchemaWithout({ id: true });
const handleError = (error: z.ZodError<TSourceUnion>) => {
const errors = error.errors;
@ -933,18 +1023,47 @@ export function TableSourceForm({
const _onCreate = useCallback(() => {
clearErrors();
handleSubmit(data => {
handleSubmit(async data => {
const parseResult = sourceFormSchema.safeParse(data);
if (parseResult.error) {
handleError(parseResult.error);
return;
}
createSource.mutate(
// TODO: HDX-1768 get rid of this type assertion
{ source: data as TSource },
{
onSuccess: data => {
onCreate?.(data);
onSuccess: async newSource => {
// Handle bidirectional linking for new sources
const correlationFields = CORRELATION_FIELD_MAP[newSource.kind];
if (correlationFields && sources) {
for (const [fieldName, targetConfigs] of Object.entries(
correlationFields,
)) {
const targetSourceId = (newSource as any)[fieldName];
if (targetSourceId) {
for (const { targetKind, targetField } of targetConfigs) {
const targetSource = sources.find(
s => s.id === targetSourceId,
);
if (targetSource && targetSource.kind === targetKind) {
// Only update if the target field is empty to avoid overwriting existing correlations
if (!targetSource[targetField]) {
await updateSource.mutateAsync({
source: {
...targetSource,
[targetField]: newSource.id,
} as TSource,
});
}
}
}
}
}
}
onCreate?.(newSource);
notifications.show({
color: 'green',
message: 'Source created',
@ -959,7 +1078,15 @@ export function TableSourceForm({
},
);
})();
}, [handleSubmit, createSource, onCreate, kind, formState]);
}, [
handleSubmit,
createSource,
onCreate,
kind,
formState,
sources,
updateSource,
]);
const _onSave = useCallback(() => {
clearErrors();

View file

@ -0,0 +1,225 @@
import { SourceKind, TSource } from '@hyperdx/common-utils/dist/types';
import '@testing-library/jest-dom';
describe('TableSourceForm Bidirectional Linking', () => {
describe('Bidirectional Source Linking Logic', () => {
it('should establish bidirectional link when creating a log source with trace correlation', async () => {
const mockUpdateSourceMutateAsync = jest.fn();
const mockOnCreate = jest.fn();
// Mock sources data
const sources = [
{
id: 'trace-source-1',
name: 'Trace Source 1',
kind: SourceKind.Trace,
logSourceId: undefined,
},
];
const newLogSource: TSource = {
id: 'new-log-source',
name: 'New Log Source',
kind: SourceKind.Log,
connection: 'conn-1',
from: {
databaseName: 'default',
tableName: 'new_logs',
},
timestampValueExpression: 'Timestamp',
traceSourceId: 'trace-source-1', // This will trigger bidirectional linking
} as TSource;
// This is the bidirectional linking logic from the component
const correlationFields: Record<
string,
Array<{ targetKind: SourceKind; targetField: string }>
> = {
traceSourceId: [
{ targetKind: SourceKind.Trace, targetField: 'logSourceId' },
],
};
// Execute the bidirectional linking logic
for (const [fieldName, targetConfigs] of Object.entries(
correlationFields,
)) {
const targetSourceId = (newLogSource as any)[fieldName];
if (targetSourceId) {
for (const { targetKind, targetField } of targetConfigs) {
const targetSource = sources.find(s => s.id === targetSourceId);
if (targetSource && targetSource.kind === targetKind) {
// Only update if the target field is empty
if (!(targetSource as any)[targetField]) {
await mockUpdateSourceMutateAsync({
source: {
...targetSource,
[targetField]: newLogSource.id,
} as TSource,
});
}
}
}
}
}
mockOnCreate(newLogSource);
// Verify that the bidirectional linking was established
expect(mockUpdateSourceMutateAsync).toHaveBeenCalledWith(
expect.objectContaining({
source: expect.objectContaining({
id: 'trace-source-1',
logSourceId: 'new-log-source',
}),
}),
);
expect(mockOnCreate).toHaveBeenCalledWith(newLogSource);
});
it('should establish bidirectional link when creating a trace source with log correlation', async () => {
const mockUpdateSourceMutateAsync = jest.fn();
const mockOnCreate = jest.fn();
// Mock sources data
const sources = [
{
id: 'log-source-1',
name: 'Log Source 1',
kind: SourceKind.Log,
traceSourceId: undefined,
},
];
const newTraceSource: TSource = {
id: 'new-trace-source',
name: 'New Trace Source',
kind: SourceKind.Trace,
connection: 'conn-1',
from: {
databaseName: 'default',
tableName: 'new_traces',
},
timestampValueExpression: 'Timestamp',
logSourceId: 'log-source-1', // This will trigger bidirectional linking
} as TSource;
// This is the bidirectional linking logic from the component for trace sources
const correlationFields: Record<
string,
Array<{ targetKind: SourceKind; targetField: string }>
> = {
logSourceId: [
{ targetKind: SourceKind.Log, targetField: 'traceSourceId' },
],
};
// Execute the bidirectional linking logic
for (const [fieldName, targetConfigs] of Object.entries(
correlationFields,
)) {
const targetSourceId = (newTraceSource as any)[fieldName];
if (targetSourceId) {
for (const { targetKind, targetField } of targetConfigs) {
const targetSource = sources.find(s => s.id === targetSourceId);
if (targetSource && targetSource.kind === targetKind) {
// Only update if the target field is empty
if (!(targetSource as any)[targetField]) {
await mockUpdateSourceMutateAsync({
source: {
...targetSource,
[targetField]: newTraceSource.id,
} as TSource,
});
}
}
}
}
}
mockOnCreate(newTraceSource);
// Verify that the bidirectional linking was established
expect(mockUpdateSourceMutateAsync).toHaveBeenCalledWith(
expect.objectContaining({
source: expect.objectContaining({
id: 'log-source-1',
traceSourceId: 'new-trace-source',
}),
}),
);
expect(mockOnCreate).toHaveBeenCalledWith(newTraceSource);
});
it('should not establish bidirectional link if target source already has a correlation', async () => {
const mockUpdateSourceMutateAsync = jest.fn();
const mockOnCreate = jest.fn();
// Mock sources with existing correlations
const sources = [
{
id: 'log-source-1',
name: 'Log Source 1',
kind: SourceKind.Log,
traceSourceId: 'existing-trace-source', // Already has a correlation
},
];
const newTraceSource: TSource = {
id: 'new-trace-source',
name: 'New Trace Source',
kind: SourceKind.Trace,
connection: 'conn-1',
from: {
databaseName: 'default',
tableName: 'new_traces',
},
timestampValueExpression: 'Timestamp',
logSourceId: 'log-source-1', // This should NOT establish bidirectional link
} as TSource;
// This is the bidirectional linking logic from the component for trace sources
const correlationFields: Record<
string,
Array<{ targetKind: SourceKind; targetField: string }>
> = {
logSourceId: [
{ targetKind: SourceKind.Log, targetField: 'traceSourceId' },
],
};
// Execute the bidirectional linking logic
for (const [fieldName, targetConfigs] of Object.entries(
correlationFields,
)) {
const targetSourceId = (newTraceSource as any)[fieldName];
if (targetSourceId) {
for (const { targetKind, targetField } of targetConfigs) {
const targetSource = sources.find(s => s.id === targetSourceId);
if (targetSource && targetSource.kind === targetKind) {
// Only update if the target field is empty
if (!(targetSource as any)[targetField]) {
await mockUpdateSourceMutateAsync({
source: {
...targetSource,
[targetField]: newTraceSource.id,
} as TSource,
});
}
}
}
}
}
mockOnCreate(newTraceSource);
// Verify that NO bidirectional linking was established since the log source already has a trace correlation
expect(mockUpdateSourceMutateAsync).not.toHaveBeenCalled();
expect(mockOnCreate).toHaveBeenCalledWith(newTraceSource);
});
});
});