Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Refactor modules and module status #80

Open
wants to merge 1 commit into
base: feature/ui-revamp
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 132 additions & 89 deletions modules/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,80 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/safing/portbase/log"
)

type eventHookFn func(context.Context, interface{}) error
type (
// EventObserverFunc can be registered for one or more event types
// and will be called with the event payload.
// Any error returned from the observer function will be logged.
EventObserverFunc func(context.Context, interface{}) error

type eventHook struct {
description string
hookingModule *Module
hookFn eventHookFn
}
// eventHooks keeps track of registered event subscriptions.
eventHooks struct {
sync.RWMutex
subscriptions map[string][]*subscription
}

// TriggerEvent executes all hook functions registered to the specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, data)
// subscription defines the subscription to an observable.
// Any time the observable emits an event the subscriptions
// callback is called.
subscription struct {
// description is a human readable description of the
// subscription purpose. This is mainly used for logging
// purposes.
description string
// subscriber is a reference to the module that placed
// the subscription.
subscriber *Module
// target holds a reference to the module that is
// observed by this subscription
target *Module
// callback is the function to execute when the observed
// event occurs.
callback EventObserverFunc
}
}
)

func (m *Module) processEventTrigger(event string, data interface{}) {
m.eventHooksLock.RLock()
defer m.eventHooksLock.RUnlock()
// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.events.defineEvent(event)
}

hooks, ok := m.eventHooks[event]
if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return
// RegisterEventHook registers a hook function with (another) modules'
// event. Whenever a hook is triggered and the receiving module has not
// yet fully started, hook execution will be delayed until the modules
// completed starting.
func (m *Module) RegisterEventHook(module, event, description string, fn EventObserverFunc) error {
targetModule := m
if module != m.Name {
var ok bool
// TODO(ppacher): accessing modules[module] here without any
// kind of protection seems wrong.... Check with
// @dhaavi.
targetModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
}

for _, hook := range hooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, event, data)
}
return targetModule.events.addSubscription(targetModule, m, event, description, fn)
}

// TriggerEvent executes all hook functions registered to the
// specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, event, data)
}
}

// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event.
// InjectEvent triggers an event from a foreign module and executes
// all hook functions registered to that event.
// Note that sourceEventName is only used for logging purposes while
// targetModuleName and targetEventName must actually exist.
func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName string, data interface{}) error {
if !m.OnlineSoon() {
return errors.New("module not yet started")
Expand All @@ -55,99 +92,105 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName
return fmt.Errorf(`module "%s" does not exist`, targetModuleName)
}

targetModule.eventHooksLock.RLock()
defer targetModule.eventHooksLock.RUnlock()
targetModule.processEventTrigger(targetEventName, sourceEventName, data)

targetHooks, ok := targetModule.eventHooks[targetEventName]
return nil
}

func (m *Module) processEventTrigger(eventID, eventName string, data interface{}) {
m.events.RLock()
defer m.events.RUnlock()

hooks, ok := m.events.subscriptions[eventID]
if !ok {
return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName)
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, eventID)
return
}

for _, hook := range targetHooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, sourceEventName, data)
for _, hook := range hooks {
if hook.subscriber.OnlineSoon() {
go hook.runEventHook(eventName, data)
}
}
}

return nil
func (hook *subscription) Name(event string) string {
return fmt.Sprintf("event hook %s/%s -> %s/%s", hook.target.Name, event, hook.subscriber.Name, hook.description)
}

func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
// check if source module is ready for handling
if m.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-m.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
}
func waitForModule(ctx context.Context, m *Module) bool {
select {
case <-ctx.Done():
return false
case <-m.StartCompleted():
return true
}
}

func (hook *subscription) runEventHook(event string, data interface{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// check if destionation module is ready for handling
if hook.hookingModule.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
go func() {
select {
case <-hook.hookingModule.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
case <-hook.subscriber.Stopping():
cancel()
case <-hook.target.Stopping():
cancel()
case <-ctx.Done():
}
}()

// wait for both modules to become online (or shutdown)
if !waitForModule(ctx, hook.target) || !waitForModule(ctx, hook.subscriber) {
return
}

err := hook.hookingModule.RunWorker(
fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description),
err := hook.subscriber.RunWorker(
hook.Name(event),
func(ctx context.Context) error {
return hook.hookFn(ctx, data)
return hook.callback(ctx, data)
},
)
if err != nil {
log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err)
log.Warningf("%s: failed to execute %s: %s", hook.target.Name, hook.Name(event), err)
}
}

// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.eventHooksLock.Lock()
defer m.eventHooksLock.Unlock()
func (hooks *eventHooks) addSubscription(target, subscriber *Module, event, descr string, fn EventObserverFunc) error {
hooks.Lock()
defer hooks.Unlock()

_, ok := m.eventHooks[event]
if !ok {
m.eventHooks[event] = make([]*eventHook, 0, 1)
if hooks.subscriptions == nil {
return fmt.Errorf("unknown event %q", event)
}
}

// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting.
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
// get target module
var eventModule *Module
if module == m.Name {
eventModule = m
} else {
var ok bool
eventModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
if _, ok := hooks.subscriptions[event]; !ok {
return fmt.Errorf("unknown event %q", event)
}

// get target event
eventModule.eventHooksLock.Lock()
defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event]
if !ok {
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
}
hooks.subscriptions[event] = append(
hooks.subscriptions[event],
&subscription{
description: descr,
subscriber: subscriber,
target: target,
callback: fn,
},
)

// add hook
eventModule.eventHooks[event] = append(hooks, &eventHook{
description: description,
hookingModule: m,
hookFn: fn,
})
return nil
}

func (hooks *eventHooks) defineEvent(event string) {
hooks.Lock()
defer hooks.Unlock()

if hooks.subscriptions == nil {
hooks.subscriptions = make(map[string][]*subscription)
}

if _, ok := hooks.subscriptions[event]; !ok {
hooks.subscriptions[event] = make([]*subscription, 0, 1)
}
}
2 changes: 1 addition & 1 deletion modules/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseFlags() error {
func printGraph() {
// mark roots
for _, module := range modules {
if len(module.depReverse) == 0 {
if len(module.dependencies.reverse) == 0 {
// is root, dont print deps in dep tree
module.stopFlag.Set()
}
Expand Down
2 changes: 1 addition & 1 deletion modules/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func buildEnabledTree() {
}

func (m *Module) markDependencies() {
for _, dep := range m.depModules {
for _, dep := range m.dependencies.modules {
if dep.enabledAsDependency.SetToIf(false, true) {
dep.markDependencies()
}
Expand Down
4 changes: 2 additions & 2 deletions modules/microtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context)
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) {
// start for module
// hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1)
atomic.AddInt32(m.stats.microTaskCnt, 1)
m.waitGroup.Add(1)

// set up recovery
Expand All @@ -144,7 +144,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
}

// finish for module
atomic.AddInt32(m.microTaskCnt, -1)
atomic.AddInt32(m.stats.microTaskCnt, -1)
m.waitGroup.Done()

// finish and possibly trigger next task
Expand Down
Loading