Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add HOT-RELOAD feature and tests #261

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/cmd/goaws.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
func main() {
var filename string
var debug bool
var hotReload bool
flag.StringVar(&filename, "config", "", "config file location + name")
flag.BoolVar(&debug, "debug", false, "debug log level (default Warning)")
flag.BoolVar(&hotReload, "hot-reload", false, "enable hot reload of config file for creation of new sqs queues and sns topics (default false)")
flag.Parse()

log.SetFormatter(&log.JSONFormatter{})
Expand Down Expand Up @@ -53,6 +55,11 @@ func main() {
quit := make(chan struct{}, 0)
go gosqs.PeriodicTasks(1*time.Second, quit)

//start config watcher
if hotReload {
go conf.StartWatcher(filename, env)
}

if len(portNumbers) == 1 {
log.Warnf("GoAws listening on: 0.0.0.0:%s", portNumbers[0])
err := http.ListenAndServe("0.0.0.0:"+portNumbers[0], r)
Expand Down
178 changes: 143 additions & 35 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package conf
import (
"encoding/json"
"fmt"

"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
Expand All @@ -13,12 +15,14 @@ import (

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/common"
"github.com/fsnotify/fsnotify"
"github.com/ghodss/yaml"
)

var envs map[string]app.Environment

func LoadYamlConfig(filename string, env string) []string {

ports := []string{"4100"}

if filename == "" {
Expand Down Expand Up @@ -79,8 +83,144 @@ func LoadYamlConfig(filename string, env string) []string {
app.CurrentEnvironment.Port = "4100"
}

app.SyncQueues.Lock()
err = createSqsQueues(env)
if err != nil {
return ports
}

err = createSNSTopics(env)
if err != nil {
return ports
}

return ports
}

func StartWatcher(filename string, env string) {
quit := make(chan struct{})
//create watcher
if filename == "" {
filename, _ = filepath.Abs("./app/conf/goaws.yaml")
}
log.Infof("Starting watcher on file: %v", filename)

watcher, err := fsnotify.NewWatcher()
defer watcher.Close()

if err != nil {
log.Errorf("err: %s", err)
}

// Start listening for events.
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Remove) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this a bit deeper, I don't think I fully understand this block. Is it because in your environment - this "remove" event is what happens on a file change (in a "remove" then "recreate" pattern)? If so, the file name isn't changing is it? Could we not just do the wait/loop to check if it's rebuilt and then proceed down with the rest of the reloading calls?

I think I'm confused about 2 things really.

  1. Why call StartWatcher again, and synchronously?
  2. Why block the outer function with the quit channel? In most of these non-k8 with the remove event cases would that ever actually hang up or is that goroutine just orphaned now?

//wait for file recreation
//REMOVE are used in k8s environment by configmap
for {
log.Infof("Waiting for file to be created: %s", filename)
time.Sleep(2 * time.Second)
_, err := os.Stat(filename)
if err == nil {
log.Infof("file created: %s", filename)
defer StartWatcher(filename, env)
close(quit)
break
}
}
} else if !event.Has(fsnotify.Write) {
//discard non-Write events
continue
}
log.Infof("Reloading config file: %s", filename)

yamlFile, err := os.ReadFile(filename)
if err != nil {
log.Errorf("err: %s", err)
return
}

err = yaml.Unmarshal(yamlFile, &envs)
if err != nil {
log.Errorf("err: %s", err)
return
}

log.Infoln("Load new SQS config:")
err = createSqsQueues(env)
Admiral-Piett marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("err: %s", err)
return
}
log.Infoln("Load new SNS config:")
err = createSNSTopics(env)
if err != nil {
log.Errorf("err: %s", err)
return
}
case err, ok := <-watcher.Errors:
if !ok {
log.Errorf("err: %s", err)
return
}
log.Println("error:", err)
}
}
}()

//add watcher
log.Debugf("Started watcher to filename: %s", filename)
err = watcher.Add(filename)
if err != nil {
log.Errorf("err: %s", err)
}

//block goroutine until end of main execution
<-quit

}

func createSNSTopics(env string) error {
app.SyncTopics.Lock()
for _, topic := range envs[env].Topics {
topicArn := "arn:aws:sns:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + topic.Name

newTopic := &app.Topic{Name: topic.Name, Arn: topicArn}
newTopic.Subscriptions = make([]*app.Subscription, 0, 0)

for _, subs := range topic.Subscriptions {
var newSub *app.Subscription
if strings.Contains(subs.Protocol, "http") {
newSub = createHttpSubscription(subs)
} else {
//Queue does not exist yet, create it.
newSub = createSqsSubscription(subs, topicArn)
}
if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err := json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return err
}
newSub.FilterPolicy = filterPolicy
}

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
app.SyncTopics.Topics[topic.Name] = newTopic
}
app.SyncTopics.Unlock()
return nil
}

func createSqsQueues(env string) error {
app.SyncQueues.Lock()
for _, queue := range envs[env].Queues {
queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port +
"/" + app.CurrentEnvironment.AccountID + "/" + queue.Name
Expand Down Expand Up @@ -117,45 +257,13 @@ func LoadYamlConfig(filename string, env string) []string {
err := setQueueRedrivePolicy(app.SyncQueues.Queues, q, queue.RedrivePolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
}

}

for _, topic := range envs[env].Topics {
topicArn := "arn:aws:sns:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + topic.Name

newTopic := &app.Topic{Name: topic.Name, Arn: topicArn}
newTopic.Subscriptions = make([]*app.Subscription, 0, 0)

for _, subs := range topic.Subscriptions {
var newSub *app.Subscription
if strings.Contains(subs.Protocol, "http") {
newSub = createHttpSubscription(subs)
} else {
//Queue does not exist yet, create it.
newSub = createSqsSubscription(subs, topicArn)
return err
}
if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
newSub.FilterPolicy = filterPolicy
}

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
app.SyncTopics.Topics[topic.Name] = newTopic
}

app.SyncQueues.Unlock()
app.SyncTopics.Unlock()

return ports
return nil
}

func createHttpSubscription(configSubscription app.EnvSubsciption) *app.Subscription {
Expand Down
64 changes: 62 additions & 2 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package conf

import (
"testing"

"github.com/Admiral-Piett/goaws/app"
"os"
"testing"
"time"
)

func TestConfig_NoQueuesOrTopics(t *testing.T) {
Expand Down Expand Up @@ -120,3 +121,62 @@ func TestConfig_NoQueueAttributeDefaults(t *testing.T) {
t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime)
}
}

func TestConfig_HotReloadConfigFile(t *testing.T) {

env := "Local"
port := LoadYamlConfig("./mock-data/mock-config.yaml", env)

//start a watcher goroutine
go StartWatcher("./mock-data/mock-config.yaml", env)
time.Sleep(5 * time.Second)

if port[0] != "4100" {
t.Errorf("Expected port number 4100 but got %s\n", port)
}

//backup mock-config.yaml
backupFile, err := os.ReadFile("./mock-data/mock-config.yaml")
if err != nil {
t.Errorf("Error backuping mock-config.yaml: %v\n", err)
}

//read mock-updated-config.yaml
updatedFile, err := os.ReadFile("./mock-data/mock-updated-config.yaml")
if err != nil {
t.Errorf("Error reading mock-updated-config.yaml: %v\n", err)
}

//write updatedFile on mock-config.yaml
err = os.WriteFile("./mock-data/mock-config.yaml", updatedFile, 0644)
if err != nil {
t.Errorf("Error updating mock-config.yaml: %v\n", err)
}

time.Sleep(5 * time.Second)

//make tests checks
numQueues := len(envs[env].Queues)
if numQueues != 5 {
t.Errorf("Expected five queues to be in the environment but got %d\n", numQueues)
}
numQueues = len(app.SyncQueues.Queues)
if numQueues != 8 {
t.Errorf("Expected eight queues to be in the sqs topics but got %d\n", numQueues)
}

numTopics := len(envs[env].Topics)
if numTopics != 3 {
t.Errorf("Expected two topics to be in the environment but got %d\n", numTopics)
}
numTopics = len(app.SyncTopics.Topics)
if numTopics != 3 {
t.Errorf("Expected two topics to be in the sns topics but got %d\n", numTopics)
}

//restore mock-config.yaml
err = os.WriteFile("./mock-data/mock-config.yaml", backupFile, 0644)
if err != nil {
t.Errorf("Error updating mock-config.yaml: %v\n", err)
}
}
52 changes: 52 additions & 0 deletions app/conf/mock-data/mock-updated-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
Local: # Environment name that can be passed on the command line
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
Host: localhost # hostname of the goaws system (for docker-compose this is the tag name of the container)
Port: 4100 # port to listen on.
Region: us-east-1
AccountId: "100010001000"
LogMessages: true # Log messages (true/false)
LogFile: ./goaws_messages.log # Log filename (for message logging
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 10 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 10 # receive message max wait time
MaximumMessageSize: 1024 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
MaximumMessageSize: 128 # Queue maximum message size (bytes)
- Name: local-queue3 # Queue name
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:000000000000:local-queue3-dlq"}'
- Name: local-queue3-dlq # Queue name
- Name: local-queue6 # Queue name
Topics: # List of topic to create at startup
- Name: local-topic1 # Topic name - with some Subscriptions
Subscriptions: # List of Subscriptions to create for this topic (queues will be created as required)
- QueueName: local-queue4 # Queue name
Raw: false # Raw message delivery (true/false)
- QueueName: local-queue5 # Queue name
Raw: true # Raw message delivery (true/false)
FilterPolicy: '{"foo":["bar"]}' # Subscription's FilterPolicy, json like a string
- Name: local-topic2 # Topic name - no Subscriptions
- Name: local-topic3 # Topic name - no Subscriptions
Subscriptions: # List of Subscriptions to create for this topic (queues will be created as required)
- QueueName: local-queue7 # Queue name
Raw: false

NoQueuesOrTopics: # Another environment
Host: localhost
Port: 4100
LogMessages: true
LogFile: ./goaws_messages.log
Region: eu-west-1

NoQueueAttributeDefaults:
Host: localhost
Port: 4100
LogMessages: true
LogFile: ./goaws_messages.log
Region: eu-west-1
Queues:
- Name: local-queue1
- Name: local-queue2
ReceiveMessageWaitTimeSeconds: 20
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/aws/aws-sdk-go v1.34.0
github.com/fsnotify/fsnotify v1.6.0
github.com/ghodss/yaml v1.0.0
github.com/gorilla/mux v1.8.0
github.com/sirupsen/logrus v1.9.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand All @@ -28,6 +30,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down