diff --git a/publisher/kinesis/kinesis_test.go b/publisher/kinesis/kinesis_test.go index 0b6ca06..8460ed5 100644 --- a/publisher/kinesis/kinesis_test.go +++ b/publisher/kinesis/kinesis_test.go @@ -6,18 +6,48 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" pb "github.com/raystack/raccoon/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestKinesisProducer_UnitTest(t *testing.T) { + events := []*pb.Event{ + { + Type: "unknown", + }, + } t.Run("should return an error if stream existence check fails", func(t *testing.T) { - events := []*pb.Event{ - { - Type: "unknown", + client := &mockKinesisClient{} + + client.On( + "DescribeStreamSummary", + mock.Anything, + &kinesis.DescribeStreamSummaryInput{ + StreamName: aws.String("unknown"), }, + mock.Anything, + ).Return( + &kinesis.DescribeStreamSummaryOutput{}, + fmt.Errorf("simulated error"), + ).Once() + defer client.AssertExpectations(t) + + p, err := New( + nil, // we will override it later + WithStreamAutocreate(true), + ) + if err != nil { + t.Errorf("error constructing client: %v", err) + return } + p.client = client + + err = p.ProduceBulk(events, "") + assert.Error(t, err, "error when sending message: simulated error") + }) + t.Run("should return an error if stream creation exceeds resource limit", func(t *testing.T) { client := &mockKinesisClient{} client.On( @@ -29,9 +59,18 @@ func TestKinesisProducer_UnitTest(t *testing.T) { mock.Anything, ).Return( &kinesis.DescribeStreamSummaryOutput{}, - fmt.Errorf("simulated error"), + &types.ResourceNotFoundException{}, ).Once() + client.On("CreateStream", mock.Anything, mock.Anything, mock.Anything). + Return( + &kinesis.CreateStreamOutput{}, + &types.LimitExceededException{ + Message: aws.String("stream limit reached"), + }, + ).Once() + defer client.AssertExpectations(t) + p, err := New( nil, // we will override it later WithStreamAutocreate(true), @@ -43,6 +82,29 @@ func TestKinesisProducer_UnitTest(t *testing.T) { p.client = client err = p.ProduceBulk(events, "") - assert.NotNil(t, err) + assert.Error(t, err, "error when sending messages: LimitExceededException: stream limit reached") + }) + t.Run("should return an error if rate limit is exceeded", func(t *testing.T) { + + client := &mockKinesisClient{} + + client.On("PutRecord", mock.Anything, mock.Anything, mock.Anything). + Return( + &kinesis.PutRecordOutput{}, + &types.ProvisionedThroughputExceededException{ + Message: aws.String("put limit exceeded"), + }, + ).Once() + defer client.AssertExpectations(t) + + p, err := New(nil) + if err != nil { + t.Errorf("error constructing client: %v", err) + return + } + p.client = client + + err = p.ProduceBulk(events, "") + assert.Error(t, err, "error when sending messages: ProvisionedThroughputExceededException: put limit exceeded") }) }