Skip to content

Commit

Permalink
Add full support for ECS in Application Signals (#1336)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjrara committed Sep 27, 2024
1 parent 5e49a98 commit ea7a81d
Show file tree
Hide file tree
Showing 20 changed files with 1,042 additions and 115 deletions.
8 changes: 8 additions & 0 deletions plugins/processors/awsapplicationsignals/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ const (
AttributeHost = "Host"
)

// Platform attribute used as CloudWatch EMF log field.
const (
MetricAttributeECSCluster = "ECS.Cluster"
MetricAttributeECSTaskId = "ECS.TaskId"
MetricAttributeECSTaskDefinitionFamily = "ECS.TaskDefinitionFamily"
MetricAttributeECSTaskDefinitionRevision = "ECS.TaskDefinitionRevision"
)

// Telemetry attributes used as CloudWatch EMF log fields.
const (
MetricAttributeTelemetrySDK = "Telemetry.SDK"
Expand Down
4 changes: 1 addition & 3 deletions plugins/processors/awsapplicationsignals/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func (cfg *Config) Validate() error {
if resolver.Name == "" {
return errors.New("name must not be empty for k8s resolver")
}
case PlatformEC2, PlatformGeneric:
case PlatformECS:
return errors.New("ecs resolver is not supported")
case PlatformEC2, PlatformECS, PlatformGeneric:
default:
return errors.New("unknown resolver")
}
Expand Down
74 changes: 52 additions & 22 deletions plugins/processors/awsapplicationsignals/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,41 @@ import (
)

func TestValidatePassed(t *testing.T) {
config := Config{
Resolvers: []Resolver{NewEKSResolver("test"), NewGenericResolver("")},
Rules: nil,
}
assert.Nil(t, config.Validate())

config = Config{
Resolvers: []Resolver{NewK8sResolver("test"), NewGenericResolver("")},
Rules: nil,
tests := []struct {
name string
resolver Resolver
}{
{
"testEKS",
NewEKSResolver("test"),
},
{
"testK8S",
NewK8sResolver("test"),
},
{
"testEC2",
NewEC2Resolver("test"),
},
{
"testECS",
NewECSResolver("test"),
},
{
"testGeneric",
NewGenericResolver("test"),
},
}
assert.Nil(t, config.Validate())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := Config{
Resolvers: []Resolver{tt.resolver},
Rules: nil,
}
assert.Nil(t, config.Validate())

config = Config{
Resolvers: []Resolver{NewEC2Resolver("test"), NewGenericResolver("")},
Rules: nil,
})
}
assert.Nil(t, config.Validate())
}

func TestValidateFailedOnEmptyResolver(t *testing.T) {
Expand All @@ -38,15 +56,27 @@ func TestValidateFailedOnEmptyResolver(t *testing.T) {
}

func TestValidateFailedOnEmptyResolverName(t *testing.T) {
config := Config{
Resolvers: []Resolver{NewEKSResolver("")},
Rules: nil,
tests := []struct {
name string
resolver Resolver
}{
{
"testEKS",
NewEKSResolver(""),
},
{
"testK8S",
NewK8sResolver(""),
},
}
assert.NotNil(t, config.Validate())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := Config{
Resolvers: []Resolver{tt.resolver},
Rules: nil,
}
assert.NotNil(t, config.Validate())

config = Config{
Resolvers: []Resolver{NewK8sResolver("")},
Rules: nil,
})
}
assert.NotNil(t, config.Validate())
}
7 changes: 7 additions & 0 deletions plugins/processors/awsapplicationsignals/config/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func NewEC2Resolver(name string) Resolver {
}
}

func NewECSResolver(name string) Resolver {
return Resolver{
Name: name,
Platform: PlatformECS,
}
}

func NewGenericResolver(name string) Resolver {
return Resolver{
Name: name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
AWSRemoteDbUser = "aws.remote.db.user"
AWSRemoteResourceCfnPrimaryIdentifier = "aws.remote.resource.cfn.primary.identifier"

AWSECSClusterName = "aws.ecs.cluster.name"
AWSECSTaskID = "aws.ecs.task.id"

// resource detection processor attributes
ResourceDetectionHostId = "host.id"
ResourceDetectionHostName = "host.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ var attributesRenamingForMetric = map[string]string{
attr.AWSRemoteResourceType: common.CWMetricAttributeRemoteResourceType,
attr.AWSRemoteDbUser: common.MetricAttributeRemoteDbUser,
attr.AWSRemoteResourceCfnPrimaryIdentifier: common.MetricAttributeRemoteResourceCfnPrimaryIdentifier,
attr.AWSECSClusterName: common.MetricAttributeECSCluster,
attr.AWSECSTaskID: common.MetricAttributeECSTaskId,
}

var resourceAttributesRenamingForTrace = map[string]string{
// these kubernetes resource attributes are set by the openTelemetry operator
// these kubernetes resource attributes are set by the OpenTelemetry operator
// see the code references from upstream:
// * https://github.com/open-telemetry/opentelemetry-operator/blob/0e39ee77693146e0924da3ca474a0fe14dc30b3a/pkg/instrumentation/sdk.go#L245
// * https://github.com/open-telemetry/opentelemetry-operator/blob/0e39ee77693146e0924da3ca474a0fe14dc30b3a/pkg/instrumentation/sdk.go#L305C43-L305C43
Expand All @@ -61,9 +63,9 @@ var attributesRenamingForTrace = map[string]string{
attr.AWSRemoteTarget: attr.AWSRemoteResourceIdentifier,
}

var copyMapForMetric = map[string]string{
// these kubernetes resource attributes are set by the openTelemtry operator
// see the code referecnes from upstream:
var resourceToMetricAttributes = map[string]string{
// these kubernetes resource attributes are set by the OpenTelemetry operator
// see the code references from upstream:
// * https://github.com/open-telemetry/opentelemetry-operator/blob/0e39ee77693146e0924da3ca474a0fe14dc30b3a/pkg/instrumentation/sdk.go#L245
// * https://github.com/open-telemetry/opentelemetry-operator/blob/0e39ee77693146e0924da3ca474a0fe14dc30b3a/pkg/instrumentation/sdk.go#L305C43-L305C43
semconv.AttributeK8SDeploymentName: common.AttributeK8SWorkload,
Expand All @@ -73,6 +75,8 @@ var copyMapForMetric = map[string]string{
semconv.AttributeK8SCronJobName: common.AttributeK8SWorkload,
semconv.AttributeK8SPodName: common.AttributeK8SPod,
semconv.AttributeAWSLogGroupNames: "aws.log.group.names",
semconv.AttributeAWSECSTaskRevision: common.MetricAttributeECSTaskDefinitionRevision,
semconv.AttributeAWSECSTaskFamily: common.MetricAttributeECSTaskDefinitionFamily,
}

const (
Expand Down Expand Up @@ -107,7 +111,7 @@ func (n *attributesNormalizer) copyResourceAttributesToAttributes(attributes, re
if isTrace {
return
}
for k, v := range copyMapForMetric {
for k, v := range resourceToMetricAttributes {
if resourceAttrValue, ok := resourceAttributes.Get(k); ok {
// print some debug info when an attribute value is overwritten
if originalAttrValue, ok := attributes.Get(k); ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestCopyResourceAttributesToAttributes(t *testing.T) {

// Create a pcommon.Map for resourceAttributes with some attributes
resourceAttributes := pcommon.NewMap()
for resourceAttrKey, attrKey := range copyMapForMetric {
for resourceAttrKey, attrKey := range resourceToMetricAttributes {
resourceAttributes.PutStr(resourceAttrKey, attrKey+"-value")
}
resourceAttributes.PutStr("host.id", "i-01ef7d37f42caa168")
Expand All @@ -98,7 +98,7 @@ func TestCopyResourceAttributesToAttributes(t *testing.T) {
normalizer.copyResourceAttributesToAttributes(attributes, resourceAttributes, false)

// Check that the attribute has been copied correctly
for _, attrKey := range copyMapForMetric {
for _, attrKey := range resourceToMetricAttributes {
if value, ok := attributes.Get(attrKey); !ok || value.AsString() != attrKey+"-value" {
t.Errorf("Attribute was not copied correctly: got %v, want %v", value.AsString(), attrKey+"-value")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config"
attr "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/attributes"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)

const (
Expand All @@ -25,6 +23,7 @@ const (
AttributePlatformGeneric = "Generic"
AttributePlatformEC2 = "AWS::EC2"
AttributePlatformEKS = "AWS::EKS"
AttributePlatformECS = "AWS::ECS"
AttributePlatformK8S = "K8s"
)

Expand Down Expand Up @@ -59,12 +58,10 @@ func NewAttributesResolver(resolvers []appsignalsconfig.Resolver, logger *zap.Lo
subResolvers = append(subResolvers, getKubernetesResolver(resolver.Platform, resolver.Name, logger), newKubernetesResourceAttributesResolver(resolver.Platform, resolver.Name))
case appsignalsconfig.PlatformEC2:
subResolvers = append(subResolvers, newResourceAttributesResolver(resolver.Platform, AttributePlatformEC2, DefaultInheritedAttributes))
case appsignalsconfig.PlatformECS:
subResolvers = append(subResolvers, newECSResourceAttributesResolver(resolver.Platform, resolver.Name))
default:
if ecsutil.GetECSUtilSingleton().IsECS() {
subResolvers = append(subResolvers, newResourceAttributesResolver(appsignalsconfig.PlatformECS, AttributePlatformGeneric, DefaultInheritedAttributes))
} else {
subResolvers = append(subResolvers, newResourceAttributesResolver(resolver.Platform, AttributePlatformGeneric, GenericInheritedAttributes))
}
subResolvers = append(subResolvers, newResourceAttributesResolver(resolver.Platform, AttributePlatformGeneric, GenericInheritedAttributes))
}
}
return &attributesResolver{
Expand Down Expand Up @@ -121,40 +118,15 @@ func getLocalEnvironment(attributes, resourceAttributes pcommon.Map, defaultEnvP
if val, found := resourceAttributes.Get(attr.AWSHostedInEnvironment); found {
return val.Str()
}
if defaultEnvPrefix == appsignalsconfig.PlatformECS {
if clusterName, _ := getECSClusterName(resourceAttributes); clusterName != "" {
return getDefaultEnvironment(defaultEnvPrefix, clusterName)
}
if clusterName := ecsutil.GetECSUtilSingleton().Cluster; clusterName != "" {
return getDefaultEnvironment(defaultEnvPrefix, clusterName)
}
} else if defaultEnvPrefix == appsignalsconfig.PlatformEC2 {
if defaultEnvPrefix == appsignalsconfig.PlatformEC2 {
if asgAttr, found := resourceAttributes.Get(attr.ResourceDetectionASG); found {
return getDefaultEnvironment(defaultEnvPrefix, asgAttr.Str())
}
}
return getDefaultEnvironment(defaultEnvPrefix, AttributeEnvironmentDefault)
}

func getECSClusterName(resourceAttributes pcommon.Map) (string, bool) {
if clusterAttr, ok := resourceAttributes.Get(semconv.AttributeAWSECSClusterARN); ok {
parts := strings.Split(clusterAttr.Str(), "/")
clusterName := parts[len(parts)-1]
return clusterName, true
} else if taskAttr, ok := resourceAttributes.Get(semconv.AttributeAWSECSTaskARN); ok {
parts := strings.SplitAfterN(taskAttr.Str(), ":task/", 2)
if len(parts) == 2 {
taskParts := strings.Split(parts[1], "/")
// cluster name in ARN
if len(taskParts) == 2 {
return taskParts[0], true
}
return generateLocalEnvironment(defaultEnvPrefix, asgAttr.Str())
}
}
return "", false
return generateLocalEnvironment(defaultEnvPrefix, AttributeEnvironmentDefault)
}

func getDefaultEnvironment(platformCode, val string) string {
func generateLocalEnvironment(platformCode, val string) string {
return fmt.Sprintf("%s:%s", platformCode, val)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,6 @@ func TestResourceAttributesResolverWithNoConfiguredName(t *testing.T) {
}
}

func TestResourceAttributesResolverWithECSClusterName(t *testing.T) {
resolver := resourceAttributesResolver{
defaultEnvPrefix: "ecs",
platformType: "Generic",
attributeMap: DefaultInheritedAttributes,
}

attributes := pcommon.NewMap()
resourceAttributes := pcommon.NewMap()
resourceAttributes.PutStr(semconv.AttributeAWSECSTaskARN, "arn:aws:ecs:us-west-1:123456789123:task/my-cluster/10838bed-421f-43ef-870a-f43feacbbb5b")

resolver.Process(attributes, resourceAttributes)

attribute, ok := attributes.Get(common.AttributePlatformType)
assert.True(t, ok)
assert.Equal(t, "Generic", attribute.Str())

attribute, ok = attributes.Get(attr.AWSLocalEnvironment)
assert.True(t, ok)
assert.Equal(t, "ecs:my-cluster", attribute.Str())
}

func TestResourceAttributesResolverWithOnEC2WithASG(t *testing.T) {
logger, _ := zap.NewDevelopment()
attributesResolver := NewAttributesResolver([]config.Resolver{config.NewEC2Resolver("")}, logger)
Expand Down Expand Up @@ -226,22 +204,3 @@ func TestAttributesResolver_Stop(t *testing.T) {
mockSubResolver1.AssertExpectations(t)
mockSubResolver2.AssertExpectations(t)
}

func TestGetClusterName(t *testing.T) {
resourceAttributes := pcommon.NewMap()
resourceAttributes.PutStr(semconv.AttributeAWSECSClusterARN, "arn:aws:ecs:us-west-2:123456789123:cluster/my-cluster")
clusterName, ok := getECSClusterName(resourceAttributes)
assert.True(t, ok)
assert.Equal(t, "my-cluster", clusterName)

resourceAttributes = pcommon.NewMap()
resourceAttributes.PutStr(semconv.AttributeAWSECSTaskARN, "arn:aws:ecs:us-west-1:123456789123:task/10838bed-421f-43ef-870a-f43feacbbb5b")
_, ok = getECSClusterName(resourceAttributes)
assert.False(t, ok)

resourceAttributes = pcommon.NewMap()
resourceAttributes.PutStr(semconv.AttributeAWSECSTaskARN, "arn:aws:ecs:us-west-1:123456789123:task/my-cluster/10838bed-421f-43ef-870a-f43feacbbb5b")
clusterName, ok = getECSClusterName(resourceAttributes)
assert.True(t, ok)
assert.Equal(t, "my-cluster", clusterName)
}
Loading

0 comments on commit ea7a81d

Please sign in to comment.