mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: Auto-link correlated sources bidirectionally (#990)
Automatically maintain bidirectional relationships between correlated sources. When a user selects a correlated source (e.g., Log → Metric), the target source is updated to link back (Metric → Log) if not already linked. - Works for both new and existing sources - Preserves existing correlations (no overwriting) - Improves data consistency across the application
This commit is contained in:
parent
4ce81d42b0
commit
bb37520541
3 changed files with 361 additions and 4 deletions
5
.changeset/chilly-owls-know.md
Normal file
5
.changeset/chilly-owls-know.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@hyperdx/app": minor
|
||||
---
|
||||
|
||||
Correlated source field links are bidirectional by default and no link exists.
|
||||
|
|
@ -38,6 +38,7 @@ import {
|
|||
useCreateSource,
|
||||
useDeleteSource,
|
||||
useSource,
|
||||
useSources,
|
||||
useUpdateSource,
|
||||
} from '@/source';
|
||||
|
||||
|
|
@ -56,6 +57,39 @@ const OTEL_CLICKHOUSE_EXPRESSIONS = {
|
|||
resourceAttributesExpression: 'ResourceAttributes',
|
||||
};
|
||||
|
||||
const CORRELATION_FIELD_MAP: Record<
|
||||
SourceKind,
|
||||
Record<string, { targetKind: SourceKind; targetField: keyof TSource }[]>
|
||||
> = {
|
||||
[SourceKind.Log]: {
|
||||
metricSourceId: [
|
||||
{ targetKind: SourceKind.Metric, targetField: 'logSourceId' },
|
||||
],
|
||||
traceSourceId: [
|
||||
{ targetKind: SourceKind.Trace, targetField: 'logSourceId' },
|
||||
],
|
||||
},
|
||||
[SourceKind.Trace]: {
|
||||
logSourceId: [{ targetKind: SourceKind.Log, targetField: 'traceSourceId' }],
|
||||
sessionSourceId: [
|
||||
{ targetKind: SourceKind.Session, targetField: 'traceSourceId' },
|
||||
],
|
||||
metricSourceId: [
|
||||
{ targetKind: SourceKind.Metric, targetField: 'logSourceId' },
|
||||
],
|
||||
},
|
||||
[SourceKind.Session]: {
|
||||
traceSourceId: [
|
||||
{ targetKind: SourceKind.Trace, targetField: 'sessionSourceId' },
|
||||
],
|
||||
},
|
||||
[SourceKind.Metric]: {
|
||||
logSourceId: [
|
||||
{ targetKind: SourceKind.Log, targetField: 'metricSourceId' },
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
function FormRow({
|
||||
label,
|
||||
children,
|
||||
|
|
@ -906,6 +940,62 @@ export function TableSourceForm({
|
|||
const updateSource = useUpdateSource();
|
||||
const deleteSource = useDeleteSource();
|
||||
|
||||
// Bidirectional source linking
|
||||
const { data: sources } = useSources();
|
||||
const currentSourceId = watch('id');
|
||||
|
||||
useEffect(() => {
|
||||
const { unsubscribe } = watch(async (_value, { name, type }) => {
|
||||
const value = _value as TSourceUnion;
|
||||
if (!currentSourceId || !sources || type !== 'change') return;
|
||||
|
||||
const correlationFields = CORRELATION_FIELD_MAP[kind];
|
||||
if (!correlationFields || !name || !(name in correlationFields)) return;
|
||||
|
||||
const fieldName = name as keyof TSourceUnion;
|
||||
const newTargetSourceId = value[fieldName] as string | undefined;
|
||||
const targetConfigs = correlationFields[fieldName];
|
||||
|
||||
for (const { targetKind, targetField } of targetConfigs) {
|
||||
// Find the previously linked source if any
|
||||
const previouslyLinkedSource = sources.find(
|
||||
s => s.kind === targetKind && s[targetField] === currentSourceId,
|
||||
);
|
||||
|
||||
// If there was a previously linked source and it's different from the new one, unlink it
|
||||
if (
|
||||
previouslyLinkedSource &&
|
||||
previouslyLinkedSource.id !== newTargetSourceId
|
||||
) {
|
||||
await updateSource.mutateAsync({
|
||||
source: {
|
||||
...previouslyLinkedSource,
|
||||
[targetField]: undefined,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
|
||||
// If a new source is selected, link it back
|
||||
if (newTargetSourceId) {
|
||||
const targetSource = sources.find(s => s.id === newTargetSourceId);
|
||||
if (targetSource && targetSource.kind === targetKind) {
|
||||
// Only update if the target field is empty to avoid overwriting existing correlations
|
||||
if (!targetSource[targetField]) {
|
||||
await updateSource.mutateAsync({
|
||||
source: {
|
||||
...targetSource,
|
||||
[targetField]: currentSourceId,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return () => unsubscribe();
|
||||
}, [watch, kind, currentSourceId, sources, updateSource]);
|
||||
|
||||
const sourceFormSchema = sourceSchemaWithout({ id: true });
|
||||
const handleError = (error: z.ZodError<TSourceUnion>) => {
|
||||
const errors = error.errors;
|
||||
|
|
@ -933,18 +1023,47 @@ export function TableSourceForm({
|
|||
|
||||
const _onCreate = useCallback(() => {
|
||||
clearErrors();
|
||||
handleSubmit(data => {
|
||||
handleSubmit(async data => {
|
||||
const parseResult = sourceFormSchema.safeParse(data);
|
||||
if (parseResult.error) {
|
||||
handleError(parseResult.error);
|
||||
return;
|
||||
}
|
||||
|
||||
createSource.mutate(
|
||||
// TODO: HDX-1768 get rid of this type assertion
|
||||
{ source: data as TSource },
|
||||
{
|
||||
onSuccess: data => {
|
||||
onCreate?.(data);
|
||||
onSuccess: async newSource => {
|
||||
// Handle bidirectional linking for new sources
|
||||
const correlationFields = CORRELATION_FIELD_MAP[newSource.kind];
|
||||
if (correlationFields && sources) {
|
||||
for (const [fieldName, targetConfigs] of Object.entries(
|
||||
correlationFields,
|
||||
)) {
|
||||
const targetSourceId = (newSource as any)[fieldName];
|
||||
if (targetSourceId) {
|
||||
for (const { targetKind, targetField } of targetConfigs) {
|
||||
const targetSource = sources.find(
|
||||
s => s.id === targetSourceId,
|
||||
);
|
||||
if (targetSource && targetSource.kind === targetKind) {
|
||||
// Only update if the target field is empty to avoid overwriting existing correlations
|
||||
if (!targetSource[targetField]) {
|
||||
await updateSource.mutateAsync({
|
||||
source: {
|
||||
...targetSource,
|
||||
[targetField]: newSource.id,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onCreate?.(newSource);
|
||||
notifications.show({
|
||||
color: 'green',
|
||||
message: 'Source created',
|
||||
|
|
@ -959,7 +1078,15 @@ export function TableSourceForm({
|
|||
},
|
||||
);
|
||||
})();
|
||||
}, [handleSubmit, createSource, onCreate, kind, formState]);
|
||||
}, [
|
||||
handleSubmit,
|
||||
createSource,
|
||||
onCreate,
|
||||
kind,
|
||||
formState,
|
||||
sources,
|
||||
updateSource,
|
||||
]);
|
||||
|
||||
const _onSave = useCallback(() => {
|
||||
clearErrors();
|
||||
|
|
|
|||
225
packages/app/src/components/__tests__/SourceForm.test.tsx
Normal file
225
packages/app/src/components/__tests__/SourceForm.test.tsx
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
import { SourceKind, TSource } from '@hyperdx/common-utils/dist/types';
|
||||
|
||||
import '@testing-library/jest-dom';
|
||||
|
||||
describe('TableSourceForm Bidirectional Linking', () => {
|
||||
describe('Bidirectional Source Linking Logic', () => {
|
||||
it('should establish bidirectional link when creating a log source with trace correlation', async () => {
|
||||
const mockUpdateSourceMutateAsync = jest.fn();
|
||||
const mockOnCreate = jest.fn();
|
||||
|
||||
// Mock sources data
|
||||
const sources = [
|
||||
{
|
||||
id: 'trace-source-1',
|
||||
name: 'Trace Source 1',
|
||||
kind: SourceKind.Trace,
|
||||
logSourceId: undefined,
|
||||
},
|
||||
];
|
||||
|
||||
const newLogSource: TSource = {
|
||||
id: 'new-log-source',
|
||||
name: 'New Log Source',
|
||||
kind: SourceKind.Log,
|
||||
connection: 'conn-1',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'new_logs',
|
||||
},
|
||||
timestampValueExpression: 'Timestamp',
|
||||
traceSourceId: 'trace-source-1', // This will trigger bidirectional linking
|
||||
} as TSource;
|
||||
|
||||
// This is the bidirectional linking logic from the component
|
||||
const correlationFields: Record<
|
||||
string,
|
||||
Array<{ targetKind: SourceKind; targetField: string }>
|
||||
> = {
|
||||
traceSourceId: [
|
||||
{ targetKind: SourceKind.Trace, targetField: 'logSourceId' },
|
||||
],
|
||||
};
|
||||
|
||||
// Execute the bidirectional linking logic
|
||||
for (const [fieldName, targetConfigs] of Object.entries(
|
||||
correlationFields,
|
||||
)) {
|
||||
const targetSourceId = (newLogSource as any)[fieldName];
|
||||
if (targetSourceId) {
|
||||
for (const { targetKind, targetField } of targetConfigs) {
|
||||
const targetSource = sources.find(s => s.id === targetSourceId);
|
||||
if (targetSource && targetSource.kind === targetKind) {
|
||||
// Only update if the target field is empty
|
||||
if (!(targetSource as any)[targetField]) {
|
||||
await mockUpdateSourceMutateAsync({
|
||||
source: {
|
||||
...targetSource,
|
||||
[targetField]: newLogSource.id,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mockOnCreate(newLogSource);
|
||||
|
||||
// Verify that the bidirectional linking was established
|
||||
expect(mockUpdateSourceMutateAsync).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
source: expect.objectContaining({
|
||||
id: 'trace-source-1',
|
||||
logSourceId: 'new-log-source',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
expect(mockOnCreate).toHaveBeenCalledWith(newLogSource);
|
||||
});
|
||||
|
||||
it('should establish bidirectional link when creating a trace source with log correlation', async () => {
|
||||
const mockUpdateSourceMutateAsync = jest.fn();
|
||||
const mockOnCreate = jest.fn();
|
||||
|
||||
// Mock sources data
|
||||
const sources = [
|
||||
{
|
||||
id: 'log-source-1',
|
||||
name: 'Log Source 1',
|
||||
kind: SourceKind.Log,
|
||||
traceSourceId: undefined,
|
||||
},
|
||||
];
|
||||
|
||||
const newTraceSource: TSource = {
|
||||
id: 'new-trace-source',
|
||||
name: 'New Trace Source',
|
||||
kind: SourceKind.Trace,
|
||||
connection: 'conn-1',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'new_traces',
|
||||
},
|
||||
timestampValueExpression: 'Timestamp',
|
||||
logSourceId: 'log-source-1', // This will trigger bidirectional linking
|
||||
} as TSource;
|
||||
|
||||
// This is the bidirectional linking logic from the component for trace sources
|
||||
const correlationFields: Record<
|
||||
string,
|
||||
Array<{ targetKind: SourceKind; targetField: string }>
|
||||
> = {
|
||||
logSourceId: [
|
||||
{ targetKind: SourceKind.Log, targetField: 'traceSourceId' },
|
||||
],
|
||||
};
|
||||
|
||||
// Execute the bidirectional linking logic
|
||||
for (const [fieldName, targetConfigs] of Object.entries(
|
||||
correlationFields,
|
||||
)) {
|
||||
const targetSourceId = (newTraceSource as any)[fieldName];
|
||||
if (targetSourceId) {
|
||||
for (const { targetKind, targetField } of targetConfigs) {
|
||||
const targetSource = sources.find(s => s.id === targetSourceId);
|
||||
if (targetSource && targetSource.kind === targetKind) {
|
||||
// Only update if the target field is empty
|
||||
if (!(targetSource as any)[targetField]) {
|
||||
await mockUpdateSourceMutateAsync({
|
||||
source: {
|
||||
...targetSource,
|
||||
[targetField]: newTraceSource.id,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mockOnCreate(newTraceSource);
|
||||
|
||||
// Verify that the bidirectional linking was established
|
||||
expect(mockUpdateSourceMutateAsync).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
source: expect.objectContaining({
|
||||
id: 'log-source-1',
|
||||
traceSourceId: 'new-trace-source',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
expect(mockOnCreate).toHaveBeenCalledWith(newTraceSource);
|
||||
});
|
||||
|
||||
it('should not establish bidirectional link if target source already has a correlation', async () => {
|
||||
const mockUpdateSourceMutateAsync = jest.fn();
|
||||
const mockOnCreate = jest.fn();
|
||||
|
||||
// Mock sources with existing correlations
|
||||
const sources = [
|
||||
{
|
||||
id: 'log-source-1',
|
||||
name: 'Log Source 1',
|
||||
kind: SourceKind.Log,
|
||||
traceSourceId: 'existing-trace-source', // Already has a correlation
|
||||
},
|
||||
];
|
||||
|
||||
const newTraceSource: TSource = {
|
||||
id: 'new-trace-source',
|
||||
name: 'New Trace Source',
|
||||
kind: SourceKind.Trace,
|
||||
connection: 'conn-1',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'new_traces',
|
||||
},
|
||||
timestampValueExpression: 'Timestamp',
|
||||
logSourceId: 'log-source-1', // This should NOT establish bidirectional link
|
||||
} as TSource;
|
||||
|
||||
// This is the bidirectional linking logic from the component for trace sources
|
||||
const correlationFields: Record<
|
||||
string,
|
||||
Array<{ targetKind: SourceKind; targetField: string }>
|
||||
> = {
|
||||
logSourceId: [
|
||||
{ targetKind: SourceKind.Log, targetField: 'traceSourceId' },
|
||||
],
|
||||
};
|
||||
|
||||
// Execute the bidirectional linking logic
|
||||
for (const [fieldName, targetConfigs] of Object.entries(
|
||||
correlationFields,
|
||||
)) {
|
||||
const targetSourceId = (newTraceSource as any)[fieldName];
|
||||
if (targetSourceId) {
|
||||
for (const { targetKind, targetField } of targetConfigs) {
|
||||
const targetSource = sources.find(s => s.id === targetSourceId);
|
||||
if (targetSource && targetSource.kind === targetKind) {
|
||||
// Only update if the target field is empty
|
||||
if (!(targetSource as any)[targetField]) {
|
||||
await mockUpdateSourceMutateAsync({
|
||||
source: {
|
||||
...targetSource,
|
||||
[targetField]: newTraceSource.id,
|
||||
} as TSource,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mockOnCreate(newTraceSource);
|
||||
|
||||
// Verify that NO bidirectional linking was established since the log source already has a trace correlation
|
||||
expect(mockUpdateSourceMutateAsync).not.toHaveBeenCalled();
|
||||
|
||||
expect(mockOnCreate).toHaveBeenCalledWith(newTraceSource);
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
Reference in a new issue