feat: add new k8s.pod.status_phase metrics + handle py span ids (#264)

This commit is contained in:
Warren 2024-01-21 18:08:05 -08:00 committed by GitHub
parent bb0db35f13
commit 8de2c5cf0d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 156 additions and 13 deletions

View file

@ -0,0 +1,6 @@
---
'@hyperdx/api': patch
'@hyperdx/app': patch
---
fix: handle py span ids

View file

@ -0,0 +1,6 @@
---
'@hyperdx/api': patch
'@hyperdx/app': patch
---
feat: parse lambda json message

View file

@ -0,0 +1,6 @@
---
'@hyperdx/api': patch
'@hyperdx/app': patch
---
feat: add new k8s.pod.status_phase metrics

View file

@ -8,6 +8,14 @@ acknowledgements.enabled = true
[api]
enabled = true
address = "0.0.0.0:8686"
[sources.vector_logs]
type = "internal_logs"
# [sinks.console]
# type = "console"
# inputs = ["internal_logs"]
# encoding.codec = "json"
# --------------------------------------------------------------------------------
@ -102,7 +110,7 @@ condition = '''
[transforms.logs]
type = "remap"
inputs = ["filter_logs"]
inputs = ["filter_logs", "internal_logs"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
@ -193,6 +201,10 @@ if .hdx_platform == "vector-internal" {
.b = {}
.b._hdx_body = tmp_body
.b.heroku = {}
.b.heroku.app = del(.heroku_app)
.b.heroku.dyno = tmp_attrs."7"
.b.heroku.source = tmp_attrs."6"
.ts = to_unix_timestamp(parse_timestamp(del(tmp_attrs."4"), format: "%+") ?? now(), unit: "nanoseconds")
if contains(.sv, "heroku") {
@ -385,13 +397,34 @@ if .hdx_platform == "vector-internal" {
tmp_timestamp = to_int(.b."@timestamp") ?? 0
.b._hdx_body = .b.message
.ts = to_unix_timestamp(from_unix_timestamp(tmp_timestamp, unit: "milliseconds") ?? now(), unit: "nanoseconds")
.sv = del(.b.type)
if is_nullish(.sv) {
.sv = .b.logGroup
}
structured = parse_json(.b.message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
} else {
# BETA: extract json from message
# TODO: move this to post_logs if it's performant enough
if ends_with(string(.b.message) ?? "", "}") {
_SEARCH_CHARS_LENGTH = 64
left_most_bracket_index = find(slice(.b.message, start:0, end:_SEARCH_CHARS_LENGTH) ?? "", "{") ?? -1
if left_most_bracket_index != -1 {
tmp_json = slice(.b.message, start:left_most_bracket_index) ?? ""
structured = parse_json(tmp_json) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
}
}
}
# use user-specifed service name by default
tmp_type = del(.b.type)
.sv = .b."service.name"
if is_nullish(.sv) {
.sv = tmp_type
if is_nullish(.sv) {
.sv = .b.logGroup
}
}
} else if .hdx_platform == "aws-sns" {
.st = "ok"
@ -405,6 +438,19 @@ if .hdx_platform == "vector-internal" {
del(.r."@timestamp")
tmp_timestamp = del(.b."@timestamp")
# FIX: need to fix python SDK instead
if .b."telemetry.sdk.language" == "python" {
if .b."service.name" == "unknown_service" {
.b."service.name" = del(.b.otelServiceName)
}
if is_nullish(.b.span_id) {
.b.span_id = del(.b.otelSpanID)
}
if is_nullish(.b.trace_id) {
.b.trace_id = del(.b.otelTraceID)
}
}
.r.timestamp = tmp_timestamp
.b.timestamp = tmp_timestamp
@ -635,22 +681,23 @@ if is_object(.r) {
.b.process = tmp_process
.b.__events = .r.logs
# TODO: we want to delete the redundant .b.process.tag.x fields eventually
# TODO: maybe we want to move "tag" to the root level
# extract k8s tags
if !is_nullish(.b.process.tag."k8s.pod.name") {
.b."k8s.pod.name" = del(.b.process.tag."k8s.pod.name")
.b."k8s.pod.name" = .b.process.tag."k8s.pod.name"
}
if !is_nullish(.b.process.tag."k8s.pod.uid") {
.b."k8s.pod.uid" = del(.b.process.tag."k8s.pod.uid")
.b."k8s.pod.uid" = .b.process.tag."k8s.pod.uid"
}
if !is_nullish(.b.process.tag."k8s.namespace.name") {
.b."k8s.namespace.name" = del(.b.process.tag."k8s.namespace.name")
.b."k8s.namespace.name" = .b.process.tag."k8s.namespace.name"
}
if !is_nullish(.b.process.tag."k8s.node.name") {
.b."k8s.node.name" = del(.b.process.tag."k8s.node.name")
.b."k8s.node.name" = .b.process.tag."k8s.node.name"
}
if !is_nullish(.b.process.tag."k8s.deployment.name") {
.b."k8s.deployment.name" = del(.b.process.tag."k8s.deployment.name")
.b."k8s.deployment.name" = .b.process.tag."k8s.deployment.name"
}
if (.b."span.kind" == "server") {
@ -796,8 +843,69 @@ source = '''
!includes(filtered_keys, key)
}
.b.host = structured.host
# Extra K8s tags
if .n == "k8s.pod.phase" {
tmp_metrics = []
tmp_metrics = push(tmp_metrics, .)
tmp_states = [
"pending",
"running",
"succeeded",
"failed",
"unknown"
]
# Create new metrics "k8s.pod.status_phase"
for_each(tmp_states) -> |_index, value| {
_tmp_metric = .
_tmp_metric.n = "k8s.pod.status_phase"
_tmp_metric.v = 0
_tmp_metric.b.state = value
if .v == 1 && value == "pending" {
_tmp_metric.v = 1
} else if .v == 2 && value == "running" {
_tmp_metric.v = 1
} else if .v == 3 && value == "succeeded" {
_tmp_metric.v = 1
} else if .v == 4 && value == "failed" {
_tmp_metric.v = 1
} else if .v == 5 && value == "unknown" {
_tmp_metric.v = 1
}
tmp_metrics = push(tmp_metrics, _tmp_metric)
}
.__hdx_metrics = tmp_metrics
}
}
'''
[transforms.post_metrics_unnest]
type = "remap"
inputs = ["metrics"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
if is_array(.__hdx_metrics) {
., err = unnest(.__hdx_metrics)
if err != null {
log("unnest failed: " + err, level: "error")
}
}
'''
[transforms.post_metrics]
type = "remap"
inputs = ["post_metrics_unnest"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
if is_object(.__hdx_metrics) {
tmp = .__hdx_metrics
. = tmp
}
'''
# --------------------------------------------------------------------------------
@ -808,10 +916,12 @@ source = '''
type = "remap"
inputs = [
"logs.dropped",
"metrics.dropped",
"post_logs.dropped",
"post_logs_unnest.dropped",
"spans.dropped"
"post_logs.dropped",
"spans.dropped",
"metrics.dropped",
"post_metrics_unnest.dropped",
"post_metrics.dropped"
]
source = '''
log(., level: "error")

View file

@ -1,3 +1,18 @@
# --------------------------------------------------------------------------------
# ------------------------------ Transforms --------------------------------------
# --------------------------------------------------------------------------------
[transforms.internal_logs]
type = "remap"
inputs = ["vector_logs"]
source = '''
. = merge(.,
{
"hdx_token": "${HYPERDX_API_KEY}",
"hdx_platform": "vector-internal"
}
)
'''
[sinks.go_parser]
type = "http"
uri = "${GO_PARSER_API_URL}"