diff --git a/.changeset/curvy-snakes-unite.md b/.changeset/curvy-snakes-unite.md new file mode 100644 index 00000000..354b2cb0 --- /dev/null +++ b/.changeset/curvy-snakes-unite.md @@ -0,0 +1,6 @@ +--- +'@hyperdx/api': patch +'@hyperdx/app': patch +--- + +fix: handle py span ids diff --git a/.changeset/lovely-baboons-fetch.md b/.changeset/lovely-baboons-fetch.md new file mode 100644 index 00000000..956001d8 --- /dev/null +++ b/.changeset/lovely-baboons-fetch.md @@ -0,0 +1,6 @@ +--- +'@hyperdx/api': patch +'@hyperdx/app': patch +--- + +feat: parse lambda json message diff --git a/.changeset/stale-pears-hammer.md b/.changeset/stale-pears-hammer.md new file mode 100644 index 00000000..58a2af65 --- /dev/null +++ b/.changeset/stale-pears-hammer.md @@ -0,0 +1,6 @@ +--- +'@hyperdx/api': patch +'@hyperdx/app': patch +--- + +feat: add new k8s.pod.status_phase metrics diff --git a/docker/ingestor/core.toml b/docker/ingestor/core.toml index 9dd416cd..bd51d5df 100644 --- a/docker/ingestor/core.toml +++ b/docker/ingestor/core.toml @@ -8,6 +8,14 @@ acknowledgements.enabled = true [api] enabled = true address = "0.0.0.0:8686" + +[sources.vector_logs] + type = "internal_logs" + +# [sinks.console] +# type = "console" +# inputs = ["internal_logs"] +# encoding.codec = "json" # -------------------------------------------------------------------------------- @@ -102,7 +110,7 @@ condition = ''' [transforms.logs] type = "remap" -inputs = ["filter_logs"] +inputs = ["filter_logs", "internal_logs"] drop_on_abort = true drop_on_error = true reroute_dropped = true @@ -193,6 +201,10 @@ if .hdx_platform == "vector-internal" { .b = {} .b._hdx_body = tmp_body + .b.heroku = {} + .b.heroku.app = del(.heroku_app) + .b.heroku.dyno = tmp_attrs."7" + .b.heroku.source = tmp_attrs."6" .ts = to_unix_timestamp(parse_timestamp(del(tmp_attrs."4"), format: "%+") ?? now(), unit: "nanoseconds") if contains(.sv, "heroku") { @@ -385,13 +397,34 @@ if .hdx_platform == "vector-internal" { tmp_timestamp = to_int(.b."@timestamp") ?? 0 .b._hdx_body = .b.message .ts = to_unix_timestamp(from_unix_timestamp(tmp_timestamp, unit: "milliseconds") ?? now(), unit: "nanoseconds") - .sv = del(.b.type) - if is_nullish(.sv) { - .sv = .b.logGroup - } + structured = parse_json(.b.message) ?? null if is_object(structured) { .b = merge(.b, structured, deep: true) ?? .b + } else { + # BETA: extract json from message + # TODO: move this to post_logs if it's performant enough + if ends_with(string(.b.message) ?? "", "}") { + _SEARCH_CHARS_LENGTH = 64 + left_most_bracket_index = find(slice(.b.message, start:0, end:_SEARCH_CHARS_LENGTH) ?? "", "{") ?? -1 + if left_most_bracket_index != -1 { + tmp_json = slice(.b.message, start:left_most_bracket_index) ?? "" + structured = parse_json(tmp_json) ?? null + if is_object(structured) { + .b = merge(.b, structured, deep: true) ?? .b + } + } + } + } + + # use user-specifed service name by default + tmp_type = del(.b.type) + .sv = .b."service.name" + if is_nullish(.sv) { + .sv = tmp_type + if is_nullish(.sv) { + .sv = .b.logGroup + } } } else if .hdx_platform == "aws-sns" { .st = "ok" @@ -405,6 +438,19 @@ if .hdx_platform == "vector-internal" { del(.r."@timestamp") tmp_timestamp = del(.b."@timestamp") + # FIX: need to fix python SDK instead + if .b."telemetry.sdk.language" == "python" { + if .b."service.name" == "unknown_service" { + .b."service.name" = del(.b.otelServiceName) + } + if is_nullish(.b.span_id) { + .b.span_id = del(.b.otelSpanID) + } + if is_nullish(.b.trace_id) { + .b.trace_id = del(.b.otelTraceID) + } + } + .r.timestamp = tmp_timestamp .b.timestamp = tmp_timestamp @@ -635,22 +681,23 @@ if is_object(.r) { .b.process = tmp_process .b.__events = .r.logs + # TODO: we want to delete the redundant .b.process.tag.x fields eventually # TODO: maybe we want to move "tag" to the root level # extract k8s tags if !is_nullish(.b.process.tag."k8s.pod.name") { - .b."k8s.pod.name" = del(.b.process.tag."k8s.pod.name") + .b."k8s.pod.name" = .b.process.tag."k8s.pod.name" } if !is_nullish(.b.process.tag."k8s.pod.uid") { - .b."k8s.pod.uid" = del(.b.process.tag."k8s.pod.uid") + .b."k8s.pod.uid" = .b.process.tag."k8s.pod.uid" } if !is_nullish(.b.process.tag."k8s.namespace.name") { - .b."k8s.namespace.name" = del(.b.process.tag."k8s.namespace.name") + .b."k8s.namespace.name" = .b.process.tag."k8s.namespace.name" } if !is_nullish(.b.process.tag."k8s.node.name") { - .b."k8s.node.name" = del(.b.process.tag."k8s.node.name") + .b."k8s.node.name" = .b.process.tag."k8s.node.name" } if !is_nullish(.b.process.tag."k8s.deployment.name") { - .b."k8s.deployment.name" = del(.b.process.tag."k8s.deployment.name") + .b."k8s.deployment.name" = .b.process.tag."k8s.deployment.name" } if (.b."span.kind" == "server") { @@ -796,8 +843,69 @@ source = ''' !includes(filtered_keys, key) } .b.host = structured.host + + # Extra K8s tags + if .n == "k8s.pod.phase" { + tmp_metrics = [] + tmp_metrics = push(tmp_metrics, .) + tmp_states = [ + "pending", + "running", + "succeeded", + "failed", + "unknown" + ] + # Create new metrics "k8s.pod.status_phase" + for_each(tmp_states) -> |_index, value| { + _tmp_metric = . + _tmp_metric.n = "k8s.pod.status_phase" + _tmp_metric.v = 0 + _tmp_metric.b.state = value + if .v == 1 && value == "pending" { + _tmp_metric.v = 1 + } else if .v == 2 && value == "running" { + _tmp_metric.v = 1 + } else if .v == 3 && value == "succeeded" { + _tmp_metric.v = 1 + } else if .v == 4 && value == "failed" { + _tmp_metric.v = 1 + } else if .v == 5 && value == "unknown" { + _tmp_metric.v = 1 + } + tmp_metrics = push(tmp_metrics, _tmp_metric) + } + .__hdx_metrics = tmp_metrics + } } ''' + +[transforms.post_metrics_unnest] +type = "remap" +inputs = ["metrics"] +drop_on_abort = true +drop_on_error = true +reroute_dropped = true +source = ''' +if is_array(.__hdx_metrics) { + ., err = unnest(.__hdx_metrics) + if err != null { + log("unnest failed: " + err, level: "error") + } +} +''' + +[transforms.post_metrics] +type = "remap" +inputs = ["post_metrics_unnest"] +drop_on_abort = true +drop_on_error = true +reroute_dropped = true +source = ''' +if is_object(.__hdx_metrics) { + tmp = .__hdx_metrics + . = tmp +} +''' # -------------------------------------------------------------------------------- @@ -808,10 +916,12 @@ source = ''' type = "remap" inputs = [ "logs.dropped", - "metrics.dropped", - "post_logs.dropped", "post_logs_unnest.dropped", - "spans.dropped" + "post_logs.dropped", + "spans.dropped", + "metrics.dropped", + "post_metrics_unnest.dropped", + "post_metrics.dropped" ] source = ''' log(., level: "error") diff --git a/docker/ingestor/http-sinks.toml b/docker/ingestor/http-sinks.toml index 16e484d5..7d3527c0 100644 --- a/docker/ingestor/http-sinks.toml +++ b/docker/ingestor/http-sinks.toml @@ -1,3 +1,18 @@ +# -------------------------------------------------------------------------------- +# ------------------------------ Transforms -------------------------------------- +# -------------------------------------------------------------------------------- +[transforms.internal_logs] +type = "remap" +inputs = ["vector_logs"] +source = ''' + . = merge(., + { + "hdx_token": "${HYPERDX_API_KEY}", + "hdx_platform": "vector-internal" + } + ) +''' + [sinks.go_parser] type = "http" uri = "${GO_PARSER_API_URL}"