From 5a2554ed57e8e05e95aa70970e1539bcda6450c1 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Mon, 21 Sep 2020 21:24:10 +0200 Subject: [PATCH] Unclutter Module struct and refactor events --- modules/events.go | 221 ++++++++++++++++++++-------------- modules/flags.go | 2 +- modules/mgmt.go | 2 +- modules/microtasks.go | 4 +- modules/modules.go | 174 ++++++++++++++------------ modules/status.go | 6 +- modules/tasks.go | 28 ++--- modules/tasks_test.go | 1 + modules/worker.go | 8 +- notifications/notification.go | 4 + 10 files changed, 258 insertions(+), 192 deletions(-) diff --git a/modules/events.go b/modules/events.go index 54e70aa3..7de0419f 100644 --- a/modules/events.go +++ b/modules/events.go @@ -4,43 +4,80 @@ import ( "context" "errors" "fmt" + "sync" "github.com/safing/portbase/log" ) -type eventHookFn func(context.Context, interface{}) error +type ( + // EventObserverFunc can be registered for one or more event types + // and will be called with the event payload. + // Any error returned from the observer function will be logged. + EventObserverFunc func(context.Context, interface{}) error -type eventHook struct { - description string - hookingModule *Module - hookFn eventHookFn -} + // eventHooks keeps track of registered event subscriptions. + eventHooks struct { + sync.RWMutex + subscriptions map[string][]*subscription + } -// TriggerEvent executes all hook functions registered to the specified event. -func (m *Module) TriggerEvent(event string, data interface{}) { - if m.OnlineSoon() { - go m.processEventTrigger(event, data) + // subscription defines the subscription to an observable. + // Any time the observable emits an event the subscriptions + // callback is called. + subscription struct { + // description is a human readable description of the + // subscription purpose. This is mainly used for logging + // purposes. + description string + // subscriber is a reference to the module that placed + // the subscription. + subscriber *Module + // target holds a reference to the module that is + // observed by this subscription + target *Module + // callback is the function to execute when the observed + // event occurs. + callback EventObserverFunc } -} +) -func (m *Module) processEventTrigger(event string, data interface{}) { - m.eventHooksLock.RLock() - defer m.eventHooksLock.RUnlock() +// RegisterEvent registers a new event to allow for registering hooks. +func (m *Module) RegisterEvent(event string) { + m.events.defineEvent(event) +} - hooks, ok := m.eventHooks[event] - if !ok { - log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event) - return +// RegisterEventHook registers a hook function with (another) modules' +// event. Whenever a hook is triggered and the receiving module has not +// yet fully started, hook execution will be delayed until the modules +// completed starting. +func (m *Module) RegisterEventHook(module, event, description string, fn EventObserverFunc) error { + targetModule := m + if module != m.Name { + var ok bool + // TODO(ppacher): accessing modules[module] here without any + // kind of protection seems wrong.... Check with + // @dhaavi. + targetModule, ok = modules[module] + if !ok { + return fmt.Errorf(`module "%s" does not exist`, module) + } } - for _, hook := range hooks { - if hook.hookingModule.OnlineSoon() { - go m.runEventHook(hook, event, data) - } + return targetModule.events.addSubscription(targetModule, m, event, description, fn) +} + +// TriggerEvent executes all hook functions registered to the +// specified event. +func (m *Module) TriggerEvent(event string, data interface{}) { + if m.OnlineSoon() { + go m.processEventTrigger(event, event, data) } } -// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event. +// InjectEvent triggers an event from a foreign module and executes +// all hook functions registered to that event. +// Note that sourceEventName is only used for logging purposes while +// targetModuleName and targetEventName must actually exist. func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName string, data interface{}) error { if !m.OnlineSoon() { return errors.New("module not yet started") @@ -55,99 +92,105 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName return fmt.Errorf(`module "%s" does not exist`, targetModuleName) } - targetModule.eventHooksLock.RLock() - defer targetModule.eventHooksLock.RUnlock() + targetModule.processEventTrigger(targetEventName, sourceEventName, data) - targetHooks, ok := targetModule.eventHooks[targetEventName] + return nil +} + +func (m *Module) processEventTrigger(eventID, eventName string, data interface{}) { + m.events.RLock() + defer m.events.RUnlock() + + hooks, ok := m.events.subscriptions[eventID] if !ok { - return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName) + log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, eventID) + return } - for _, hook := range targetHooks { - if hook.hookingModule.OnlineSoon() { - go m.runEventHook(hook, sourceEventName, data) + for _, hook := range hooks { + if hook.subscriber.OnlineSoon() { + go hook.runEventHook(eventName, data) } } +} - return nil +func (hook *subscription) Name(event string) string { + return fmt.Sprintf("event hook %s/%s -> %s/%s", hook.target.Name, event, hook.subscriber.Name, hook.description) } -func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) { - // check if source module is ready for handling - if m.Status() != StatusOnline { - // target module has not yet fully started, wait until start is complete - select { - case <-m.StartCompleted(): - // continue with hook execution - case <-hook.hookingModule.Stopping(): - return - case <-m.Stopping(): - return - } +func waitForModule(ctx context.Context, m *Module) bool { + select { + case <-ctx.Done(): + return false + case <-m.StartCompleted(): + return true } +} + +func (hook *subscription) runEventHook(event string, data interface{}) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // check if destionation module is ready for handling - if hook.hookingModule.Status() != StatusOnline { - // target module has not yet fully started, wait until start is complete + go func() { select { - case <-hook.hookingModule.StartCompleted(): - // continue with hook execution - case <-hook.hookingModule.Stopping(): - return - case <-m.Stopping(): - return + case <-hook.subscriber.Stopping(): + cancel() + case <-hook.target.Stopping(): + cancel() + case <-ctx.Done(): } + }() + + // wait for both modules to become online (or shutdown) + if !waitForModule(ctx, hook.target) || !waitForModule(ctx, hook.subscriber) { + return } - err := hook.hookingModule.RunWorker( - fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description), + err := hook.subscriber.RunWorker( + hook.Name(event), func(ctx context.Context) error { - return hook.hookFn(ctx, data) + return hook.callback(ctx, data) }, ) if err != nil { - log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err) + log.Warningf("%s: failed to execute %s: %s", hook.target.Name, hook.Name(event), err) } } -// RegisterEvent registers a new event to allow for registering hooks. -func (m *Module) RegisterEvent(event string) { - m.eventHooksLock.Lock() - defer m.eventHooksLock.Unlock() +func (hooks *eventHooks) addSubscription(target, subscriber *Module, event, descr string, fn EventObserverFunc) error { + hooks.Lock() + defer hooks.Unlock() - _, ok := m.eventHooks[event] - if !ok { - m.eventHooks[event] = make([]*eventHook, 0, 1) + if hooks.subscriptions == nil { + return fmt.Errorf("unknown event %q", event) } -} -// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting. -func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error { - // get target module - var eventModule *Module - if module == m.Name { - eventModule = m - } else { - var ok bool - eventModule, ok = modules[module] - if !ok { - return fmt.Errorf(`module "%s" does not exist`, module) - } + if _, ok := hooks.subscriptions[event]; !ok { + return fmt.Errorf("unknown event %q", event) } - // get target event - eventModule.eventHooksLock.Lock() - defer eventModule.eventHooksLock.Unlock() - hooks, ok := eventModule.eventHooks[event] - if !ok { - return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event) - } + hooks.subscriptions[event] = append( + hooks.subscriptions[event], + &subscription{ + description: descr, + subscriber: subscriber, + target: target, + callback: fn, + }, + ) - // add hook - eventModule.eventHooks[event] = append(hooks, &eventHook{ - description: description, - hookingModule: m, - hookFn: fn, - }) return nil } + +func (hooks *eventHooks) defineEvent(event string) { + hooks.Lock() + defer hooks.Unlock() + + if hooks.subscriptions == nil { + hooks.subscriptions = make(map[string][]*subscription) + } + + if _, ok := hooks.subscriptions[event]; !ok { + hooks.subscriptions[event] = make([]*subscription, 0, 1) + } +} diff --git a/modules/flags.go b/modules/flags.go index 7c5da26f..f11fa8e9 100644 --- a/modules/flags.go +++ b/modules/flags.go @@ -36,7 +36,7 @@ func parseFlags() error { func printGraph() { // mark roots for _, module := range modules { - if len(module.depReverse) == 0 { + if len(module.dependencies.reverse) == 0 { // is root, dont print deps in dep tree module.stopFlag.Set() } diff --git a/modules/mgmt.go b/modules/mgmt.go index 40817413..3a7becdb 100644 --- a/modules/mgmt.go +++ b/modules/mgmt.go @@ -104,7 +104,7 @@ func buildEnabledTree() { } func (m *Module) markDependencies() { - for _, dep := range m.depModules { + for _, dep := range m.dependencies.modules { if dep.enabledAsDependency.SetToIf(false, true) { dep.markDependencies() } diff --git a/modules/microtasks.go b/modules/microtasks.go index cf14832c..d4ed6576 100644 --- a/modules/microtasks.go +++ b/modules/microtasks.go @@ -129,7 +129,7 @@ func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) { // start for module // hint: only microTasks global var is important for scheduling, others can be set here - atomic.AddInt32(m.microTaskCnt, 1) + atomic.AddInt32(m.stats.microTaskCnt, 1) m.waitGroup.Add(1) // set up recovery @@ -144,7 +144,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err } // finish for module - atomic.AddInt32(m.microTaskCnt, -1) + atomic.AddInt32(m.stats.microTaskCnt, -1) m.waitGroup.Done() // finish and possibly trigger next task diff --git a/modules/modules.go b/modules/modules.go index c280a4e9..2eddb526 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -21,55 +21,78 @@ var ( moduleStartTimeout = 2 * time.Minute moduleStopTimeout = 1 * time.Minute +) - // ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag. +var ( + // ErrCleanExit is returned by Start() when the program is + // interrupted before starting. This can happen for example, + // when using the "--help" flag. ErrCleanExit = errors.New("clean exit requested") ) -// Module represents a module. -type Module struct { //nolint:maligned // not worth the effort - sync.RWMutex - - Name string - - // status mgmt - enabled *abool.AtomicBool - enabledAsDependency *abool.AtomicBool - status uint8 - - // failure status - failureStatus uint8 - failureID string - failureMsg string - - // lifecycle callback functions - prepFn func() error - startFn func() error - stopFn func() error - - // lifecycle mgmt - // start - startComplete chan struct{} - // stop - Ctx context.Context - cancelCtx func() - stopFlag *abool.AtomicBool - - // workers/tasks - workerCnt *int32 - taskCnt *int32 - microTaskCnt *int32 - waitGroup sync.WaitGroup - - // events - eventHooks map[string][]*eventHook - eventHooksLock sync.RWMutex - - // dependency mgmt - depNames []string - depModules []*Module - depReverse []*Module -} +type ( + // LiveCycleFunc is called at specific times during the live-cycle + // of a module. + LiveCycleFunc func() error + + liveCycleHooks struct { + prepFunc LiveCycleFunc + startFunc LiveCycleFunc + stopFunc LiveCycleFunc + } + + depInfo struct { + names []string + modules []*Module + reverse []*Module + } + + failureInfo struct { + failureStatus uint8 + failureID string + failureMsg string + } + + taskStats struct { + workerCnt *int32 + taskCnt *int32 + microTaskCnt *int32 + } + + // Module represents a module. + Module struct { //nolint:maligned // not worth the effort + sync.RWMutex + + // Name is the name of the module and is meant to be + // used for presentation purposes (i.e. user interface). + // Module names must be unique otherwise bad things will + // happen! + Name string + + // status mgmt + enabled *abool.AtomicBool + enabledAsDependency *abool.AtomicBool + status uint8 + + failureInfo + hooks liveCycleHooks + + // lifecycle mgmt + // start + startComplete chan struct{} + // stop + Ctx context.Context + cancelCtx func() + stopFlag *abool.AtomicBool + + // workers/tasks + waitGroup sync.WaitGroup + stats taskStats + + events eventHooks + dependencies depInfo + } +) // StartCompleted returns a channel read that triggers when the module has finished starting. func (m *Module) StartCompleted() <-chan struct{} { @@ -94,7 +117,7 @@ func (m *Module) IsStopping() bool { func (m *Module) Dependencies() []*Module { m.RLock() defer m.RUnlock() - return m.depModules + return m.dependencies.modules } func (m *Module) prep(reports chan *report) { @@ -116,12 +139,12 @@ func (m *Module) prep(reports chan *report) { // run prep function go func() { var err error - if m.prepFn != nil { + if m.hooks.prepFunc != nil { // execute function err = m.runCtrlFnWithTimeout( "prep module", moduleStartTimeout, - m.prepFn, + m.hooks.prepFunc, ) } // set status @@ -172,12 +195,12 @@ func (m *Module) start(reports chan *report) { // run start function go func() { var err error - if m.startFn != nil { + if m.hooks.startFunc != nil { // execute function err = m.runCtrlFnWithTimeout( "start module", moduleStartTimeout, - m.startFn, + m.hooks.startFunc, ) } // set status @@ -232,10 +255,10 @@ func (m *Module) stopAllTasks(reports chan *report) { // start shutdown function stopFnFinished := abool.NewBool(false) var stopFnError error - if m.stopFn != nil { + if m.hooks.stopFunc != nil { m.waitGroup.Add(1) go func() { - stopFnError = m.runCtrlFn("stop module", m.stopFn) + stopFnError = m.runCtrlFn("stop module", m.hooks.stopFunc) stopFnFinished.Set() m.waitGroup.Done() }() @@ -256,9 +279,9 @@ func (m *Module) stopAllTasks(reports chan *report) { "%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...", m.Name, stopFnFinished.IsSet(), - atomic.LoadInt32(m.workerCnt), - atomic.LoadInt32(m.taskCnt), - atomic.LoadInt32(m.microTaskCnt), + atomic.LoadInt32(m.stats.workerCnt), + atomic.LoadInt32(m.stats.taskCnt), + atomic.LoadInt32(m.stats.microTaskCnt), ) } @@ -307,27 +330,29 @@ func Register(name string, prep, start, stop func() error, dependencies ...strin func initNewModule(name string, prep, start, stop func() error, dependencies ...string) *Module { ctx, cancelCtx := context.WithCancel(context.Background()) - var workerCnt int32 - var taskCnt int32 - var microTaskCnt int32 newModule := &Module{ Name: name, enabled: abool.NewBool(false), enabledAsDependency: abool.NewBool(false), - prepFn: prep, - startFn: start, - stopFn: stop, - startComplete: make(chan struct{}), - Ctx: ctx, - cancelCtx: cancelCtx, - stopFlag: abool.NewBool(false), - workerCnt: &workerCnt, - taskCnt: &taskCnt, - microTaskCnt: µTaskCnt, - waitGroup: sync.WaitGroup{}, - eventHooks: make(map[string][]*eventHook), - depNames: dependencies, + hooks: liveCycleHooks{ + prepFunc: prep, + startFunc: start, + stopFunc: stop, + }, + startComplete: make(chan struct{}), + Ctx: ctx, + cancelCtx: cancelCtx, + stopFlag: abool.NewBool(false), + stats: taskStats{ + workerCnt: new(int32), + taskCnt: new(int32), + microTaskCnt: new(int32), + }, + waitGroup: sync.WaitGroup{}, + dependencies: depInfo{ + names: dependencies, + }, } return newModule @@ -335,7 +360,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... func initDependencies() error { for _, m := range modules { - for _, depName := range m.depNames { + for _, depName := range m.dependencies.names { // get dependency depModule, ok := modules[depName] @@ -344,9 +369,8 @@ func initDependencies() error { } // link together - m.depModules = append(m.depModules, depModule) - depModule.depReverse = append(depModule.depReverse, m) - + m.dependencies.modules = append(m.dependencies.modules, depModule) + depModule.dependencies.reverse = append(depModule.dependencies.reverse, m) } } diff --git a/modules/status.go b/modules/status.go index 94ccc1b1..d2162ec1 100644 --- a/modules/status.go +++ b/modules/status.go @@ -113,7 +113,7 @@ func (m *Module) readyToPrep() uint8 { return statusNothingToDo } - for _, dep := range m.depModules { + for _, dep := range m.dependencies.modules { if dep.Status() < StatusOffline { return statusWaiting } @@ -137,7 +137,7 @@ func (m *Module) readyToStart() uint8 { } // check if all dependencies are ready - for _, dep := range m.depModules { + for _, dep := range m.dependencies.modules { if dep.Status() < StatusOnline { return statusWaiting } @@ -160,7 +160,7 @@ func (m *Module) readyToStop() uint8 { return statusNothingToDo } - for _, revDep := range m.depReverse { + for _, revDep := range m.dependencies.reverse { // not ready if a reverse dependency was started, but not yet stopped if revDep.Status() > StatusOffline { return statusWaiting diff --git a/modules/tasks.go b/modules/tasks.go index b88ea5f2..f3e973ff 100644 --- a/modules/tasks.go +++ b/modules/tasks.go @@ -274,11 +274,9 @@ func (t *Task) runWithLocking() { } // check if module was stopped - select { - case <-t.ctx.Done(): + if t.ctx.Err() != nil { t.lock.Unlock() return - default: } // enter executing state @@ -291,20 +289,16 @@ func (t *Task) runWithLocking() { case <-time.After(maxTimeslotWait): } - // wait for module start - if !t.module.Online() { - if t.module.OnlineSoon() { - // wait - <-t.module.StartCompleted() - } else { - // abort, module will not come online - t.lock.Lock() - t.executing = false - t.lock.Unlock() - return - } + if !t.module.OnlineSoon() { + // abort, module will not come online + t.lock.Lock() + t.executing = false + t.lock.Unlock() + return } + <-t.module.StartCompleted() + // add to queue workgroup queueWg.Add(1) @@ -322,7 +316,7 @@ func (t *Task) runWithLocking() { func (t *Task) executeWithLocking() { // start for module // hint: only queueWg global var is important for scheduling, others can be set here - atomic.AddInt32(t.module.taskCnt, 1) + atomic.AddInt32(t.module.stats.taskCnt, 1) t.module.waitGroup.Add(1) defer func() { @@ -335,7 +329,7 @@ func (t *Task) executeWithLocking() { } // finish for module - atomic.AddInt32(t.module.taskCnt, -1) + atomic.AddInt32(t.module.stats.taskCnt, -1) t.module.waitGroup.Done() t.lock.Lock() diff --git a/modules/tasks_test.go b/modules/tasks_test.go index e1b2952c..70695411 100644 --- a/modules/tasks_test.go +++ b/modules/tasks_test.go @@ -40,6 +40,7 @@ var qtModule *Module func init() { qtModule = initNewModule("task test module", nil, nil, nil) qtModule.status = StatusOnline + close(qtModule.startComplete) } // functions diff --git a/modules/worker.go b/modules/worker.go index 4b8bd437..1ea643d4 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -38,10 +38,10 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error { return errNoModule } - atomic.AddInt32(m.workerCnt, 1) + atomic.AddInt32(m.stats.workerCnt, 1) m.waitGroup.Add(1) defer func() { - atomic.AddInt32(m.workerCnt, -1) + atomic.AddInt32(m.stats.workerCnt, -1) m.waitGroup.Done() }() @@ -59,10 +59,10 @@ func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, } func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { - atomic.AddInt32(m.workerCnt, 1) + atomic.AddInt32(m.stats.workerCnt, 1) m.waitGroup.Add(1) defer func() { - atomic.AddInt32(m.workerCnt, -1) + atomic.AddInt32(m.stats.workerCnt, -1) m.waitGroup.Done() }() diff --git a/notifications/notification.go b/notifications/notification.go index deba203c..08743f89 100644 --- a/notifications/notification.go +++ b/notifications/notification.go @@ -171,6 +171,10 @@ func (n *Notification) save(pushUpdate bool) *Notification { n.actionFunction = noOpAction } + if n.State == "" { + n.State = Active + } + // Make sure we always have a reasonable expiration set. if n.Expires == 0 { n.Expires = time.Now().Add(72 * time.Hour).Unix()