Skip to content
This repository was archived by the owner on Nov 5, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 142 additions & 56 deletions metrics/payload/payload.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 Google Inc.
// Copyright 2017-2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// This file defines functions to work with the metrics generated from the
// external probe process output.

// Package payload provides utilities to work with the metrics in payload.
package payload

import (
Expand All @@ -31,16 +29,27 @@ import (

// Parser encapsulates the config for parsing payloads to metrics.
type Parser struct {
defaultEM *metrics.EventMetrics
aggregate bool
l *logger.Logger
baseEM *metrics.EventMetrics
distMetrics map[string]*metrics.Distribution
aggregate bool
l *logger.Logger
}

// NewParser returns a new payload parser, based on the config provided.
func NewParser(opts *configpb.OutputMetricsOptions, ptype, probeName string, defaultKind metrics.Kind, l *logger.Logger) (*Parser, error) {
parser := &Parser{
aggregate: opts.GetAggregateInCloudprober(),
l: l,
aggregate: opts.GetAggregateInCloudprober(),
distMetrics: make(map[string]*metrics.Distribution),
l: l,
}

// If there are any distribution metrics, build them now itself.
for name, distMetric := range opts.GetDistMetric() {
d, err := metrics.NewDistributionFromProto(distMetric)
if err != nil {
return nil, err
}
parser.distMetrics[name] = d
}

em := metrics.NewEventMetrics(time.Now()).
Expand Down Expand Up @@ -70,16 +79,7 @@ func NewParser(opts *configpb.OutputMetricsOptions, ptype, probeName string, def
}
}

// If there are any distribution metrics, build them now itself.
for name, distMetric := range opts.GetDistMetric() {
d, err := metrics.NewDistributionFromProto(distMetric)
if err != nil {
return nil, err
}
em.AddMetric(name, d)
}

parser.defaultEM = em
parser.baseEM = em

return parser, nil
}
Expand All @@ -101,61 +101,147 @@ func updateMetricValue(mv metrics.Value, val string) error {
return mv.Add(v)
}

// PayloadMetrics parses the given payload and either updates the provided
// metrics (if we are aggregating in cloudprober) or returns new metrics (if
// not aggregating or provided metrics are nil)
func (p *Parser) PayloadMetrics(em *metrics.EventMetrics, payload, target string) *metrics.EventMetrics {
// If not initialized yet or not aggregating in cloudprober, initialize
// metrics from the default metrics.
if em == nil || !p.aggregate {
em = p.defaultEM.Clone().AddLabel("dst", target)
func parseMetricNameAndLabels(s string) (string, [][2]string) {
// Check for no labels first.
if s == "" || s[len(s)-1] != '}' {
return s, nil
}
s = s[:len(s)-1]

em.Timestamp = time.Now()
// Split at "{"
parts := strings.SplitN(s, "{", 2)
if len(parts) != 2 {
return s, nil
}
metricName, labelStr := parts[0], parts[1]

var labels [][2]string
for _, l := range strings.Split(labelStr, ",") {
parts := strings.SplitN(l, "=", 2)
if len(parts) != 2 {
continue
}
key, val := parts[0], parts[1]
// Unquote val if it is a quoted string. strconv returns an error if string
// is not quoted at all or is unproperly quoted. We use raw string in that
// case.
uval, err := strconv.Unquote(val)
if err == nil {
val = uval
}
labels = append(labels, [2]string{key, val})
}
return metricName, labels
}

func (p *Parser) metricValueLabels(line string) (metricName, val string, labels [][2]string) {
line = strings.TrimSpace(line)
if len(line) == 0 {
return
}

varKV := strings.Fields(line)
if len(varKV) != 2 {
p.l.Warning("Wrong var key-value format: ", line)
return
}

m, l := parseMetricNameAndLabels(varKV[0])
if p.aggregate && len(l) != 0 {
p.l.Warning("Payload labels are not supported in aggregate_in_cloudprober mode, bad line: ", line)
return
}

return m, varKV[1], l
}

func addNewMetric(em *metrics.EventMetrics, metricName, val string) error {
// New metric name, make sure it's not disallowed.
switch metricName {
case "success", "total", "latency":
return fmt.Errorf("metric name (%s) in the payload conflicts with standard metrics: (success,total,latency), ignoring", metricName)
}

v, err := metrics.ParseValueFromString(val)
if err != nil {
return fmt.Errorf("could not parse value (%s) for the new metric name (%s): %v", val, metricName, err)
}

em.AddMetric(metricName, v)
return nil
}

// PayloadMetrics parses the given payload and creates one EventMetrics per
// line. Each metric line can have its own labels, e.g. num_rows{db=dbA}.
func (p *Parser) PayloadMetrics(payload, target string) []*metrics.EventMetrics {
// Timestamp for all EventMetrics generated from this payload.
payloadTS := time.Now()
var results []*metrics.EventMetrics

// Convert payload variables into metrics. Variables are specified in
// the following format:
// var1 value1
// var2 value2
for _, line := range strings.Split(payload, "\n") {
line = strings.TrimSpace(line)
if len(line) == 0 {
metricName, val, labels := p.metricValueLabels(line)
if metricName == "" {
continue
}

em := p.baseEM.Clone().AddLabel("dst", target)
em.Timestamp = payloadTS
for _, kv := range labels {
em.AddLabel(kv[0], kv[1])
}

// If pre-configured, distribution metric.
if dv, ok := p.distMetrics[metricName]; ok {
d := dv.Clone().(*metrics.Distribution)
processDistValue(d, val)
em.AddMetric(metricName, d)
results = append(results, em)
continue
}

if err := addNewMetric(em, metricName, val); err != nil {
p.l.Warning(err.Error())
continue
}
varKV := strings.Fields(line)
if len(varKV) != 2 {
p.l.Warningf("Wrong var key-value format: %s", line)
results = append(results, em)
}

return results
}

// AggregatedPayloadMetrics parses the given payload and updates the provided
// metrics. If provided payload metrics is nil, we initialize a new one using
// the default values configured at the time of parser creation.
func (p *Parser) AggregatedPayloadMetrics(em *metrics.EventMetrics, payload, target string) *metrics.EventMetrics {
// If not initialized yet, initialize metrics from the default metrics.
if em == nil {
em = p.baseEM.Clone().AddLabel("dst", target)
for m, v := range p.distMetrics {
em.AddMetric(m, v)
}
}

em.Timestamp = time.Now()

for _, line := range strings.Split(payload, "\n") {
metricName, val, _ := p.metricValueLabels(line)
if metricName == "" {
continue
}
metricName := varKV[0]
val := varKV[1]

// If a metric already exists in the EventMetric (it will happen in two cases
// -- a) we are aggregating in cloudprober, or, b) this is a distribution
// metric that is already defined through the config), we simply add the new
// value (after parsing) to it, except for the distributions, which are
// handled in a special manner as their values can be provided in multiple
// ways.
// If a metric already exists in the EventMetric, we simply add the new
// value (after parsing) to it.
if mv := em.Metric(metricName); mv != nil {
if err := updateMetricValue(mv, val); err != nil {
p.l.Warningf("Error updating metric %s with val %s: %v", metricName, val, err)
}
continue
}

// New metric name, make sure it's not disallowed.
switch metricName {
case "success", "total", "latency":
p.l.Warningf("Metric name (%s) in the output conflicts with standard metrics: (success,total,latency). Ignoring.", metricName)
continue
}

v, err := metrics.ParseValueFromString(val)
if err != nil {
p.l.Warningf("Could not parse value (%s) for the new metric name (%s): %v", val, metricName, err)
if err := addNewMetric(em, metricName, val); err != nil {
p.l.Warning(err.Error())
continue
}
em.AddMetric(metricName, v)
}

return em
Expand Down
Loading