Skip to content

Commit

Permalink
Add configuration option to start from timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 8, 2021
1 parent 3f361bd commit 41f1925
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Config struct {

// Delay between tests for the client or shard numbers changing
shardCheckFrequency time.Duration

// Starting timestamp of the shard iterator, if "AT_TIMESTAMP" is the desired iterator type
iteratorStartTimestamp *time.Time
// ---------- [ For the leader (first client alphabetically) ] ----------
// Time between leader actions
leaderActionFrequency time.Duration
Expand Down Expand Up @@ -105,6 +108,12 @@ func (c Config) WithStats(stats StatReceiver) Config {
return c
}

// WithIteratorStartTimestamp returns a Config with a modified iteratorStartTimestamp
func (c Config) WithIteratorStartTimestamp(timestamp *time.Time) Config {
c.iteratorStartTimestamp = timestamp
return c
}

// WithDynamoReadCapacity returns a Config with a modified dynamo read capacity
func (c Config) WithDynamoReadCapacity(readCapacity int64) Config {
c.dynamoReadCapacity = readCapacity
Expand Down
6 changes: 5 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestConfigDefault(t *testing.T) {
config := NewConfig()
err := validateConfig(&config)
require.NoError(t, err)
require.Nil(t, config.iteratorStartTimestamp)
}

func TestConfigErrors(t *testing.T) {
Expand Down Expand Up @@ -55,13 +56,15 @@ func TestConfigErrors(t *testing.T) {

func TestConfigWithMethods(t *testing.T) {
stats := &NoopStatReceiver{}
tstamp := time.Now()
config := NewConfig().
WithBufferSize(1).
WithCommitFrequency(1 * time.Second).
WithShardCheckFrequency(1 * time.Second).
WithLeaderActionFrequency(1 * time.Second).
WithThrottleDelay(1 * time.Second).
WithStats(stats)
WithStats(stats).
WithIteratorStartTimestamp(&tstamp)

err := validateConfig(&config)
require.NoError(t, err)
Expand All @@ -72,4 +75,5 @@ func TestConfigWithMethods(t *testing.T) {
require.Equal(t, 1*time.Second, config.shardCheckFrequency)
require.Equal(t, 1*time.Second, config.leaderActionFrequency)
require.Equal(t, stats, config.stats)
require.Equal(t, &tstamp, config.iteratorStartTimestamp)
}
10 changes: 7 additions & 3 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ const (
)

// getShardIterator gets a shard iterator after the last sequence number we read or at the start of the stream
func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID string, sequenceNumber string) (string, error) {
func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID string, sequenceNumber string, iteratorStartTimestamp *time.Time) (string, error) {
shardIteratorType := kinesis.ShardIteratorTypeAfterSequenceNumber

// If we do not have a sequenceNumber yet we need to get a shardIterator
// from the horizon
ps := aws.String(sequenceNumber)
if sequenceNumber == "" {
if sequenceNumber == "" && iteratorStartTimestamp != nil {
shardIteratorType = kinesis.ShardIteratorTypeAtTimestamp
ps = nil
} else if sequenceNumber == "" {
shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
ps = nil
} else if sequenceNumber == "LATEST" {
Expand All @@ -47,6 +50,7 @@ func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID stri
ShardIteratorType: &shardIteratorType,
StartingSequenceNumber: ps,
StreamName: aws.String(streamName),
Timestamp: iteratorStartTimestamp,
})
return aws.StringValue(resp.ShardIterator), err
}
Expand Down Expand Up @@ -140,7 +144,7 @@ func (k *Kinsumer) consume(shardID string) {
}()

// Get the starting shard iterator
iterator, err := getShardIterator(k.kinesis, k.streamName, shardID, sequenceNumber)
iterator, err := getShardIterator(k.kinesis, k.streamName, shardID, sequenceNumber, k.config.iteratorStartTimestamp)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
Expand Down

0 comments on commit 41f1925

Please sign in to comment.