hyperdx/docker/ingestor/core.toml
2024-11-21 21:44:33 -08:00

980 lines
31 KiB
TOML

# Set global options
data_dir = "/var/lib/vector"
acknowledgements.enabled = true
# --------------------------------------------------------------------------------
# ------------------------------ Sources -----------------------------------------
# --------------------------------------------------------------------------------
[api]
enabled = true
address = "0.0.0.0:8686"
[sources.vector_logs]
type = "internal_logs"
# [sinks.console]
# type = "console"
# inputs = ["internal_logs"]
# encoding.codec = "json"
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
# ------------------------------ Middleware --------------------------------------
# --------------------------------------------------------------------------------
[transforms.internal_logs]
type = "remap"
inputs = ["vector_logs"]
source = '''
. = merge(.,
{
"hdx_token": "${HYPERDX_API_KEY-tacocat}",
"hdx_platform": "vector-internal"
}
)
'''
# WARNING: used for logs and spans only
[transforms.process_headers_n_params]
type = "remap"
inputs = ["http_server"]
source = '''
.hdx_content_type = del(."Content-Type")
.hdx_trace_id = split(del(.Traceparent), "-")[1] ?? null
tmp_sentry_auth = del(."X-Sentry-Auth")
# Hack sentry 😎
# for client like ruby
if is_string(tmp_sentry_auth) {
# input example: Sentry sentry_version=7, sentry_client=sentry-ruby/5.18.1, sentry_timestamp=1720998946, sentry_key=blabla
tmp_sentry_auth_headers = parse_key_value(tmp_sentry_auth, field_delimiter:",") ?? null
if is_object(tmp_sentry_auth_headers) {
.sentry_version = tmp_sentry_auth_headers."Sentry sentry_version"
.sentry_client = tmp_sentry_auth_headers.sentry_client
.sentry_key = tmp_sentry_auth_headers.sentry_key
tmp_timestamp = tmp_sentry_auth_headers.sentry_timestamp
if !is_nullish(tmp_timestamp) {
# override timestamp so its type is consistent
.b.timestamp = tmp_timestamp
}
}
}
if is_string(.sentry_key) && length(to_string(.sentry_key) ?? "") == 32 {
.hdx_content_type = "application/json"
.hdx_platform = "sentry"
.hdx_token = (slice(.sentry_key, start: 0, end: 8) + "-" + slice(.sentry_key, start: 8, end: 12) + "-" + slice(.sentry_key, start: 12, end: 16) + "-" + slice(.sentry_key, start: 16, end: 20) + "-" + slice(.sentry_key, start: 20, end: 32)) ?? null
}
'''
[transforms.unnest_jsons]
type = "remap"
inputs = ["process_headers_n_params"]
source = '''
if !includes(["otel-metrics"], .hdx_platform) && .hdx_content_type == "application/json" {
structured, err = parse_json(.message)
if err != null {
log("Unable to parse JSON: " + err, level: "warn")
} else {
.message = structured
if is_array(.message) {
., err = unnest(.message)
if err != null {
log("unnest failed: " + err, level: "error")
}
}
}
}
'''
[transforms.extract_token]
type = "remap"
inputs = ["unnest_jsons"]
source = '''
if is_nullish(.hdx_token) {
if includes(["otel-traces"], .hdx_platform) {
.hdx_token = del(.message.JaegerTag.__HDX_API_KEY)
} else {
.hdx_token = split(del(.authorization), " ")[1] ?? null
if is_nullish(.hdx_token) {
.hdx_token = del(.message.__HDX_API_KEY)
}
}
# TODO: support metrics
}
# attach a fake token for local mode
if is_nullish(.hdx_token) && "${IS_LOCAL_APP_MODE:-false}" == "true" {
.hdx_token = "00000000-0000-4000-a000-000000000000"
}
# check if token is in uuid format
if "${IS_LOCAL_APP_MODE:-false}" == "false" && !match(to_string(.hdx_token) ?? "", r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$') {
log("Invalid token: " + (to_string(.hdx_token) ?? ""), level: "warn")
.hdx_token = null
}
if !is_nullish(.hdx_token) {
.hdx_token_hash = md5(.hdx_token)
}
'''
[transforms.pre_filter]
type = "remap"
inputs = ["extract_token"]
source = '''
hdx_message_size = strlen(encode_json(.))
if hdx_message_size > 6000000 {
token = to_string(.hdx_token) ?? ""
log("Message size too large: " + to_string(hdx_message_size) + " bytes, token = " + token, level: "warn")
# HACK: so the downstream filter can drop it
.message = {}
.hdx_token = null
}
'''
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
# ------------------------------ Logs Transform ----------------------------------
# --------------------------------------------------------------------------------
[transforms.filter_logs]
type = "filter"
inputs = ["pre_filter"]
condition = '''
!includes(["otel-traces", "otel-metrics"], .hdx_platform) && !is_nullish(.hdx_token)
'''
[transforms.logs]
type = "remap"
inputs = ["filter_logs", "internal_logs"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
.r = del(.message)
if .hdx_platform == "vector-internal" {
.ts = to_unix_timestamp(parse_timestamp(del(.timestamp), format: "%+") ?? now(), unit: "nanoseconds")
.b = .
.b.message = .r
.b._hdx_body = .r
.h = .b.host
.st = downcase(.b.metadata.level) ?? null
.sv = .hdx_platform
del(.b.r)
del(.b.hdx_platform)
del(.b.hdx_token)
.r = .b
} else if .hdx_platform == "sentry" {
.b = .r
if (to_int(.sentry_version) ?? 0) >= 7 && is_object(.b.contexts) {
.ts = (to_float(.b.timestamp) ?? 0.0) * 1000000000
.b._hdx_body = .b.message
.h = .b.server_name
.st = downcase(.b.level) ?? "fatal"
# extract service name
if exists(.b.contexts.hyperdx.serviceName) {
.sv = .b.contexts.hyperdx.serviceName
} else {
.sv = .b.platform
}
# extract request metadata
if is_object(.b.request) {
.b.span.kind = "server" # only for UI
.b.http.method = upcase(.b.request.method) ?? ""
.b.http.url = .b.request.url
.b.http.request.header = .b.request.headers
del(.b.request)
}
# extract body message
if is_nullish(.b._hdx_body) {
if is_array(.b.exception.values) {
exception_type = .b.exception.values[0].type
exception_value = .b.exception.values[0].value
if is_string(.b.transaction) {
exception_value = .b.transaction
}
.b._hdx_body = (exception_type + ": " + exception_value) ?? ""
} else {
.b._hdx_body = "<unlabeled event>"
}
}
# user tags
if is_object(.b.user) {
.b.userEmail = del(.b.user.email)
.b.userId = del(.b.user.id)
.b.userName = del(.b.user.username)
}
# promote span id and trace id
.b.span_id = .b.contexts.trace.span_id
.b.trace_id = .hdx_trace_id # use "Traceparent" header
if is_nullish(.b.trace_id) {
.b.trace_id = .b.contexts.trace.trace_id
}
}
} else if .hdx_content_type == "application/logplex-1" {
tmp_raw_msg = string(.r) ?? ""
tmp_attrs = parse_regex(tmp_raw_msg, r'^(\d+) <(\d+)>(\d+) (\S+) (\S+) (\S+) (\S+) - (.*)', numeric_groups: true) ?? null
if !is_object(tmp_attrs) {
log("Unable to parse logplex: '" + tmp_raw_msg + "', token: " + to_string(.hdx_token) ?? "", level: "warn")
del(.hdx_token)
del(.hdx_platform)
} else {
# convert .r into an object
.r.message = tmp_raw_msg
tmp_priority = to_int(tmp_attrs."2") ?? null
if is_integer(tmp_priority) {
.st = to_syslog_level(mod(tmp_priority, 8) ?? 6) ?? null
}
.h = tmp_attrs."5"
.sv = (tmp_attrs."6" + "-" + tmp_attrs."7") ?? ""
tmp_body = tmp_attrs."8"
.b = {}
.b._hdx_body = tmp_body
.b.heroku = {}
.b.heroku.app = del(.heroku_app)
.b.heroku.dyno = tmp_attrs."7"
.b.heroku.source = tmp_attrs."6"
.ts = to_unix_timestamp(parse_timestamp(del(tmp_attrs."4"), format: "%+") ?? now(), unit: "nanoseconds")
if contains(.sv, "heroku") {
structured = parse_key_value(tmp_body, accept_standalone_key: false) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
.h = .b.host
}
if exists(.b.at) {
.st = downcase(.b.at) ?? null
}
# parse integer fields
tmp_int_fields = [
"connect",
"service",
"status",
"bytes"
]
for_each(tmp_int_fields) -> |_index, field| {
tmp_value = get(value: .b, path: [field]) ?? null
tmp_parsed_values = parse_regex(tmp_value, r'(\d+)', numeric_groups: true) ?? null
if is_object(tmp_parsed_values) {
tmp_parsed_value = to_int(tmp_parsed_values."0") ?? null
if is_integer(tmp_parsed_value) {
.b = set(value: .b, path: [field], data: tmp_parsed_value) ?? .b
}
}
}
for_each(.b) -> |key, value| {
if starts_with(key, "sample#") {
tmp_parsed_values = parse_regex(value, r'[-+]?(?:\d*\.*\d+)', numeric_groups: true) ?? null
if is_object(tmp_parsed_values) {
tmp_parsed_value = to_float(tmp_parsed_values."0") ?? null
if is_float(tmp_parsed_value) {
.b = set(value: .b, path: [key], data: tmp_parsed_value) ?? .b
}
}
}
}
} else {
parsed = false
# ruby logs
if !parsed {
structured = parse_regex(tmp_body, r'\[(.*) #(\d+)\]\s*(\S+) -- : (.*)', numeric_groups: true) ?? null
if is_object(structured) {
parsed = true
.b.pid = structured."2"
.b._hdx_body = structured."4"
.st = downcase(structured."3") ?? null
}
}
# JSON
if !parsed {
structured = parse_json(tmp_body) ?? null
if is_object(structured) {
parsed = true
.b = merge(.b, structured, deep: true) ?? .b
if !is_nullish(.b.message) {
.b._hdx_body = .b.message
}
if !is_nullish(.b.level) {
.st = downcase(.b.level) ?? null
}
}
}
}
}
} else if contains(to_string(.hdx_content_type) ?? "", "text/plain", case_sensitive: false) {
.b = parse_json(.r) ?? .r
if is_object(.b) {
.r = .b
if .hdx_platform == "go" {
.b._hdx_body = .b.message
.st = downcase(.b.level) ?? null
.sv = del(.b.__hdx_sv)
.h = del(.b.__hdx_h)
.ts = to_unix_timestamp(parse_timestamp(.b.ts, format: "%+") ?? now(), unit: "nanoseconds")
}
}
} else if .hdx_content_type == "application/json" {
.b = .r
if is_object(.b) {
if .hdx_platform == "nodejs" {
tmp_hdx = del(.b.__hdx)
.r = .b
.b._hdx_body = tmp_hdx.b
.h = tmp_hdx.h
.st = downcase(tmp_hdx.st) ?? null
.sv = tmp_hdx.sv
.ts = to_unix_timestamp(parse_timestamp(tmp_hdx.ts, format: "%+") ?? now(), unit: "nanoseconds")
} else if .hdx_platform == "elixir" {
.h = del(.b.__hdx_h)
.st = downcase(.b.erl_level) ?? downcase(.b.level) ?? null
.sv = del(.b.__hdx_sv)
.ts = to_unix_timestamp(parse_timestamp(.b.timestamp, format: "%+") ?? now(), unit: "nanoseconds")
.b._hdx_body = .b.message
structured = parse_json(.b.message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
} else if .hdx_platform == "flyio" {
.h = .b.host
tmp_level = downcase(.b.log.level) ?? downcase(.b.log.severity) ?? "info"
if tmp_level != "info" {
.st = tmp_level
}
.ts = to_unix_timestamp(parse_timestamp(.b.timestamp, format: "%+") ?? now(), unit: "nanoseconds")
.b._hdx_body = .b.message
structured = parse_json(.b.message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
# use user-specifed service name by default
.sv = .b."service.name"
if is_nullish(.sv) {
.sv = .b.fly.app.name
}
# TODO: maybe move this to post_logs
if !is_nullish(.b.message) {
.b._hdx_body = .b.message
}
} else if .hdx_platform == "vercel" {
.h = .b.host
.st = downcase(.b.level) ?? downcase(.b.severity) ?? null
.sv = .b.projectName
.ts = (to_int(.b.timestamp) ?? 0) * 1000000
# default to message
.b._hdx_body = .b.message
# build up custom message (apache http log format)
if exists(.b.proxy) {
# set status code
tmp_status = to_int(.b.proxy.statusCode) ?? 200
if tmp_status >= 500 {
.st = "error"
}
.b._hdx_body = .b.proxy.clientIp + " - " + "\"" + (join([
.b.proxy.method,
.b.proxy.path,
], separator: " ") ?? "") + "\"" + " " + to_string(.b.proxy.statusCode) ?? ""
}
# attach trace id
.b.trace_id = .b.requestId
# extract more props
if .b.source == "lambda" {
.b.Duration = to_float(parse_regex(.b.message, r'Duration: (?P<d>.*?) ms').d ?? null) ?? null
.b.BilledDuration = to_int(parse_regex(.b.message, r'Billed Duration: (?P<d>.*?) ms').d ?? null) ?? null
.b.InitDuration = to_float(parse_regex(.b.message, r'Init Duration: (?P<d>.*?) ms').d ?? null) ?? null
.b.MemorySize = to_int(parse_regex(.b.message, r'Memory Size: (?P<d>.*?) MB').d ?? null) ?? null
.b.MaxMemoryUsed = to_int(parse_regex(.b.message, r'Max Memory Used: (?P<d>.*?) MB').d ?? null) ?? null
tmp_splits = split(.b.message, "\n") ?? []
tmp_logs = []
for_each(tmp_splits) -> |_index, value| {
if !is_nullish(value) {
tmp_cur_log = {}
tmp_cur_log._hdx_body = value
tmp_msg_splits = split(value, "\t")
for_each(tmp_msg_splits) -> |__index, tmp_msg_split| {
if starts_with(tmp_msg_split, "{") && ends_with(tmp_msg_split, "}") {
_structured = parse_json(tmp_msg_split) ?? null
if is_object(_structured) {
tmp_cur_log = merge(tmp_cur_log, _structured, deep: false) ?? tmp_cur_log
}
}
}
tmp_logs = push(tmp_logs, tmp_cur_log)
}
}
if length(tmp_logs) > 0 {
# will be split into multiple logs
.__hdx_logs = tmp_logs
}
}
} else if .hdx_platform == "aws-lambda" {
tmp_timestamp = to_int(.b."@timestamp") ?? 0
.b._hdx_body = .b.message
.ts = to_unix_timestamp(from_unix_timestamp(tmp_timestamp, unit: "milliseconds") ?? now(), unit: "nanoseconds")
structured = parse_json(.b.message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
} else {
# BETA: extract json from message
# TODO: move this to post_logs if it's performant enough
if ends_with(string(.b.message) ?? "", "}") {
_SEARCH_CHARS_LENGTH = 64
left_most_bracket_index = find(slice(.b.message, start:0, end:_SEARCH_CHARS_LENGTH) ?? "", "{") ?? -1
if left_most_bracket_index != -1 {
tmp_json = slice(.b.message, start:left_most_bracket_index) ?? ""
structured = parse_json(tmp_json) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
}
}
}
# use user-specifed service name by default
tmp_type = del(.b.type)
.sv = .b."service.name"
if is_nullish(.sv) {
.sv = tmp_type
if is_nullish(.sv) {
.sv = .b.logGroup
}
}
} else if .hdx_platform == "aws-sns" {
.st = "ok"
.b._hdx_body = .b.Message
.ts = to_unix_timestamp(parse_timestamp(.b.Timestamp, format: "%+") ?? now(), unit: "nanoseconds")
structured = parse_json(.b.Message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
} else if .hdx_platform == "otel-logs" {
del(.r."@timestamp")
tmp_timestamp = del(.b."@timestamp")
# FIX: need to fix python SDK instead
if .b."telemetry.sdk.language" == "python" {
if .b."service.name" == "unknown_service" {
.b."service.name" = del(.b.otelServiceName)
}
if is_nullish(.b.span_id) {
.b.span_id = del(.b.otelSpanID)
}
if is_nullish(.b.trace_id) {
.b.trace_id = del(.b.otelTraceID)
}
}
.h = .b.host
.sv = .b."service.name"
.ts = to_unix_timestamp(from_unix_timestamp(tmp_timestamp, unit: "milliseconds") ?? now(), unit: "nanoseconds")
.b._hdx_body = .b.message
structured = parse_json(.b.message) ?? null
if is_object(structured) {
.b = merge(.b, structured, deep: true) ?? .b
}
# extract k8s event metadata
# TODO: should we check "k8s.event.name" instead?
if .b."k8s.resource.name" == "events" && .b.object.kind == "Event" {
# set severity
if is_nullish(.b.level) && (.b.object.type == "Warning" || .b.object.type == "Normal") {
.b.level = .b.object.type
}
# check event format (cluster legacy v1 vs v1 events)
_targetObject = {}
if .b.object.apiVersion == "events.k8s.io/v1" && is_object(.b.object.regarding) {
_targetObject = .b.object.regarding
.b._hdx_body = .b.object.note
} else if .b.object.apiVersion == "v1" && is_object(.b.object.involvedObject) {
_targetObject = .b.object.involvedObject
.b._hdx_body = .b.object.message
}
# transform the attributes so that the log events use the k8s.* semantic conventions
# ref: https://docs.honeycomb.io/integrations/kubernetes/kubernetes-events/
if _targetObject.kind == "Pod" {
.b."k8s.pod.name" = _targetObject.name
.b."k8s.pod.uid" = _targetObject.uid
.b."k8s.namespace.name" = _targetObject.namespace
} else if _targetObject.kind == "Node" {
.b."k8s.node.name" = _targetObject.name
.b."k8s.node.uid" = _targetObject.uid
} else if _targetObject.kind == "Job" {
.b."k8s.job.name" = _targetObject.name
.b."k8s.job.uid" = _targetObject.uid
.b."k8s.namespace.name" = _targetObject.namespace
} else if _targetObject.kind == "CronJob" {
.b."k8s.cronjob.name" = _targetObject.name
.b."k8s.cronjob.uid" = _targetObject.uid
.b."k8s.namespace.name" = _targetObject.namespace
}
}
# set severity after merging structured message (to avoid conflict)
.st = downcase(.b.level) ?? null
if exists(.b."rr-web.event") {
.hdx_platform = "rrweb"
temp_msg = .b.message
temp_msg_json = parse_json(temp_msg) ?? null
temp_rum_session_id = .b."rum.sessionId"
temp_rum_script_instance = .b."rum.scriptInstance"
temp_rrweb = {
"event": .b."rr-web.event",
"offset": .b."rr-web.offset",
"chunk": .b."rr-web.chunk",
"total-chunks": .b."rr-web.total-chunks"
}
.b = {}
.b._hdx_body = temp_msg
if is_object(temp_msg_json) {
.b.type = temp_msg_json.type
}
.b."rum.sessionId" = temp_rum_session_id
.b."rum.scriptInstance" = temp_rum_script_instance
.b."rr-web" = temp_rrweb
}
}
} else {
del(.b)
}
}
del(.source_type)
del(.timestamp)
del(.authorization)
del(.hdx_content_type)
del(.hdx_trace_id)
del(.sentry_client)
del(.sentry_key)
del(.sentry_version)
'''
[transforms.post_logs_unnest]
type = "remap"
inputs = ["logs"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
if is_array(.__hdx_logs) {
., err = unnest(.__hdx_logs)
if err != null {
log("unnest failed: " + err, level: "error")
}
}
'''
[transforms.post_logs]
type = "remap"
inputs = ["post_logs_unnest"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
# extract shared fields
if is_object(.b) {
# extract span/trace id
.s_id = string(.b.span_id) ?? string(.b.spanID) ?? null
.t_id = string(.b.trace_id) ?? string(.b.traceID) ?? null
del(.b.spanID)
del(.b.span_id)
del(.b.traceID)
del(.b.trace_id)
.tso = to_unix_timestamp(now(), unit: "nanoseconds")
if is_nullish(.st) {
.st = downcase(.b.level) ?? downcase(.b.severity) ?? downcase(.b.LEVEL) ?? downcase(.b.SEVERITY) ?? null
}
# address .b.level and .st conflict
if !is_nullish(.b.level) && .b.level != .st {
.b.level = .st
}
# merge vercel logs
if is_object(.__hdx_logs) {
tmp_b_size = strlen(encode_json(.b))
tmp_hdx_logs_size = strlen(encode_json(.__hdx_logs))
tmp_total_size = tmp_b_size + tmp_hdx_logs_size
# Max expanded log size 16MB
if tmp_total_size > 16000000 {
log("__hdx_logs + body size too large: " + to_string(tmp_total_size) + " bytes, token = " + to_string(.hdx_token) ?? "", level: "warn")
del(.__hdx_logs)
} else {
.b = merge(.b, del(.__hdx_logs), deep: false) ?? .b
}
.b.message = .b._hdx_body
if is_object(.r) {
.r.message = .b._hdx_body
}
}
# add _hdx_body to raw logs (make it searchable)
if is_object(.r) {
if !is_nullish(.b._hdx_body) && !contains(encode_json(.r), to_string(.b._hdx_body) ?? "", case_sensitive: false) {
.r._hdx_body = .b._hdx_body
}
}
}
# validate timestamp and set it to observed timestamp if it is invalid
if exists(.ts) {
a_day_from_now = to_unix_timestamp(now(), unit: "nanoseconds") + 86400000000000
nighty_days_ago = to_unix_timestamp(now(), unit: "nanoseconds") - 90 * 86400000000000
if (.ts > a_day_from_now ?? false) || (.ts < nighty_days_ago ?? false) {
.ts = .tso
}
}
# infer log level for raw logs
if is_nullish(.st) || !is_string(.st) {
header = ""
if is_object(.r) && !is_nullish(.b._hdx_body) {
header = slice(to_string(.b._hdx_body) ?? "", start: 0, end: 256) ?? ""
} else {
header = slice(to_string(.r) ?? "", start: 0, end: 256) ?? ""
}
# should infer level by the order of severity
if contains(header, "emerg", case_sensitive: false) {
.st = "emergency"
} else if contains(header, "alert", case_sensitive: false) {
.st = "alert"
} else if contains(header, "crit", case_sensitive: false) {
.st = "critical"
} else if contains(header, "fatal", case_sensitive: false) {
.st = "fatal"
} else if contains(header, "error", case_sensitive: false) {
.st = "error"
} else if contains(header, "warn", case_sensitive: false) {
.st = "warn"
} else if contains(header, "notice", case_sensitive: false) {
.st = "notice"
} else if contains(header, "info", case_sensitive: false) {
.st = "info"
} else if contains(header, "debug", case_sensitive: false) {
.st = "debug"
} else if contains(header, "trace", case_sensitive: false) {
.st = "trace"
} else {
.st = "info"
}
}
# TODO: compute sn ?
.sn = 0
'''
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
# ----------------------------- Spans Transform ----------------------------------
# --------------------------------------------------------------------------------
[transforms.filter_spans]
type = "filter"
inputs = ["pre_filter"]
condition = '''
includes(["otel-traces"], .hdx_platform) && !is_nullish(.hdx_token)
'''
[transforms.spans]
type = "remap"
inputs = ["filter_spans"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
.r = del(.message)
if is_object(.r) {
tmp_JaegerTag = map_keys(object(.r.JaegerTag) ?? {}, recursive: true) -> |key| { replace(key,"@",".") }
tmp_process = map_keys(object(.r.process) ?? {}, recursive: true) -> |key| { replace(key,"@",".") }
tmp_timestamp = del(.r."@timestamp")
.r.timestamp = tmp_timestamp
.r.JaegerTag = tmp_JaegerTag
.r.process = tmp_process
.s_n = .r.operationName
.s_id = .r.spanID
.t_id = .r.traceID
.p_id = null
ref = .r.references[0]
if (ref != null && ref.refType == "CHILD_OF") {
.p_id = ref.spanID
}
.b = tmp_JaegerTag
.b.process = tmp_process
.b.__events = .r.logs
# TODO: we want to delete the redundant .b.process.tag.x fields eventually
# TODO: maybe we want to move "tag" to the root level
# extract k8s tags
if !is_nullish(.b.process.tag."k8s.pod.name") {
.b."k8s.pod.name" = .b.process.tag."k8s.pod.name"
}
if !is_nullish(.b.process.tag."k8s.pod.uid") {
.b."k8s.pod.uid" = .b.process.tag."k8s.pod.uid"
}
if !is_nullish(.b.process.tag."k8s.namespace.name") {
.b."k8s.namespace.name" = .b.process.tag."k8s.namespace.name"
}
if !is_nullish(.b.process.tag."k8s.node.name") {
.b."k8s.node.name" = .b.process.tag."k8s.node.name"
}
if !is_nullish(.b.process.tag."k8s.deployment.name") {
.b."k8s.deployment.name" = .b.process.tag."k8s.deployment.name"
}
# copy common resource attributes to the top level
if is_nullish(.b."deployment.environment") && !is_nullish(.b.process.tag."deployment.environment") {
.b."deployment.environment" = .b.process.tag."deployment.environment"
}
if (.b."span.kind" == "server") {
if (exists(.b."http.status_code") && exists(.b."http.method") && exists(.b."http.route")) {
.b._hdx_body = join([
to_string(.b."http.status_code") ?? "",
.b."http.method",
.b."http.route",
], separator: " ") ?? .s_n
}
} else if (.b."span.kind" == "client") {
if (exists(.b."http.status_code") && exists(.b."http.method") && exists(.b."http.url")) {
.b._hdx_body = join([
to_string(.b."http.status_code") ?? "",
.b."http.method",
.b."http.url",
], separator: " ") ?? .s_n
}
} else if (.b."span.kind" == "internal") {
if .b.component == "console" {
.b._hdx_body = .b.message
.st = .b.level
}
}
if ((to_int(.b.error) ?? 0) == 1) {
.st = "error"
if !is_nullish(.b."otel.status_description") {
.b._hdx_body = .b."otel.status_description"
} else if !is_nullish(.b."error.message") {
.b._hdx_body = .b."error.message"
}
} else if is_array(.b.__events) {
for_each(array(.b.__events) ?? []) -> |_index, value| {
if is_object(value) {
if (value.fields[0].key == "event" && value.fields[0].value == "exception") {
.st = "error"
}
}
}
}
# set default values
if is_nullish(.st) {
.st = "ok"
}
if is_nullish(.b._hdx_body) {
.b._hdx_body = .s_n
}
# RN instrumentation
if exists(.b."process.serviceName") {
.sv = .b."process.serviceName"
} else {
.sv = .r.process.serviceName
}
.ts = to_unix_timestamp(from_unix_timestamp(.r.startTimeMillis, unit: "milliseconds") ?? now(), unit: "nanoseconds")
.et = .ts + .r.duration * 1000 ?? 0
.tso = to_unix_timestamp(now(), unit: "nanoseconds")
# TODO: move this to post_spans
# add _hdx_body to raw log
if !is_nullish(.b._hdx_body) && !contains(encode_json(.r), to_string(.b._hdx_body) ?? "", case_sensitive: false) {
.r._hdx_body = .b._hdx_body
}
del(.source_type)
del(.timestamp)
del(.authorization)
del(.hdx_content_type)
}
'''
[transforms.post_spans]
type = "filter"
inputs = ["spans"]
condition = '''
("${ENABLE_GO_PARSER:-false}" == "true" && is_nullish(.b."db.statement")) || "${ENABLE_GO_PARSER:-false}" == "false"
'''
[transforms.go_spans]
type = "filter"
inputs = ["spans"]
condition = '''
"${ENABLE_GO_PARSER:-false}" == "true" && !is_nullish(.b."db.statement")
'''
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
# ---------------------------- Metrics Transform ---------------------------------
# --------------------------------------------------------------------------------
[transforms.filter_metrics]
type = "filter"
inputs = ["http_server"]
condition = '''
includes(["otel-metrics"], .hdx_platform)
'''
[transforms.pre_metrics]
type = "remap"
inputs = ["filter_metrics"]
source = '''
del(.path)
del(.source_type)
del(.timestamp)
del(.authorization)
del(.hdx_content_type)
tmp_msg = del(.message)
.message = split(tmp_msg, "}}") ?? []
. = unnest(.message)
'''
[transforms.metrics]
type = "remap"
inputs = ["pre_metrics"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
tmp = (del(.message) + "}}") ?? null
structured, err = parse_json(tmp)
if err == null && structured.event == "metric" {
# TODO: do this at extract_token
.hdx_token = del(structured.fields.__HDX_API_KEY)
.at = to_int(del(structured.fields.metric_aggregation_temporality)) ?? 0
.dt = del(structured.fields.metric_type)
.im = to_bool(del(structured.fields.metric_is_monotonic)) ?? null
.u = del(structured.fields.metric_unit)
filtered_keys = []
for_each(object(structured.fields) ?? {})-> |key, value| {
if is_integer(value) || is_float(value) {
filtered_keys = push(filtered_keys, key)
.n = replace(key, "metric_name:", "")
.v = value
}
}
.tso = to_unix_timestamp(now(), unit: "nanoseconds")
.ts = to_int(structured.time * 1000000000 ?? null) ?? .tso
.b = filter(object(structured.fields) ?? {})-> |key, value| {
!includes(filtered_keys, key)
}
.b.host = structured.host
# Extra K8s tags
if .n == "k8s.pod.phase" {
tmp_metrics = []
tmp_metrics = push(tmp_metrics, .)
tmp_states = [
"pending",
"running",
"succeeded",
"failed",
"unknown"
]
# Create new metrics "k8s.pod.status_phase"
for_each(tmp_states) -> |_index, value| {
_tmp_metric = .
_tmp_metric.n = "k8s.pod.status_phase"
_tmp_metric.v = 0
_tmp_metric.b.state = value
if .v == 1 && value == "pending" {
_tmp_metric.v = 1
} else if .v == 2 && value == "running" {
_tmp_metric.v = 1
} else if .v == 3 && value == "succeeded" {
_tmp_metric.v = 1
} else if .v == 4 && value == "failed" {
_tmp_metric.v = 1
} else if .v == 5 && value == "unknown" {
_tmp_metric.v = 1
}
tmp_metrics = push(tmp_metrics, _tmp_metric)
}
.__hdx_metrics = tmp_metrics
}
}
'''
[transforms.post_metrics_unnest]
type = "remap"
inputs = ["metrics"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
if is_array(.__hdx_metrics) {
., err = unnest(.__hdx_metrics)
if err != null {
log("unnest failed: " + err, level: "error")
}
}
'''
[transforms.post_metrics]
type = "remap"
inputs = ["post_metrics_unnest"]
drop_on_abort = true
drop_on_error = true
reroute_dropped = true
source = '''
if is_object(.__hdx_metrics) {
tmp = .__hdx_metrics
. = tmp
}
'''
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
# --------------------------------- Debug ----------------------------------------
# --------------------------------------------------------------------------------
[transforms.debug_dropped]
type = "remap"
inputs = [
"logs.dropped",
"post_logs_unnest.dropped",
"post_logs.dropped",
"spans.dropped",
"metrics.dropped",
"post_metrics_unnest.dropped",
"post_metrics.dropped"
]
source = '''
log(., level: "error")
'''
# --------------------------------------------------------------------------------