Skip to content

Commit

Permalink
add event
Browse files Browse the repository at this point in the history
  • Loading branch information
shaowenchen committed Sep 25, 2024
1 parent b451b09 commit b5092d1
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 34 deletions.
2 changes: 2 additions & 0 deletions api/v1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
const APIVersion = "crd.chenshaowen.com/v1"

const (
HostKind = "Host"
ClusterKind = "Cluster"
TaskKind = "Task"
TaskRunKind = "TaskRun"
PipelineKind = "Pipeline"
Expand Down
13 changes: 13 additions & 0 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

opsv1 "github.com/shaowenchen/ops/api/v1"
opsconstants "github.com/shaowenchen/ops/pkg/constants"
opsevent "github.com/shaowenchen/ops/pkg/event"
opskube "github.com/shaowenchen/ops/pkg/kube"
opslog "github.com/shaowenchen/ops/pkg/log"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -157,6 +158,18 @@ func (r *ClusterReconciler) updateStatus(logger *opslog.Logger, ctx context.Cont
logger.Error.Println(err, "failed to get cluster status")
}
err = r.commitStatus(logger, ctx, c, status, "")
// push event
go func() {
ebus, err := opsevent.NewEventBus().Builder()
if err != nil {
logger.Error.Println(err, "failed to create event bus for cluster")
return
}
ebus.Publish(ctx, opsevent.EventCluster{
Server: c.Spec.Server,
ClusterStatus: *status,
})
}()
return
}

Expand Down
15 changes: 15 additions & 0 deletions controllers/host_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
opsv1 "github.com/shaowenchen/ops/api/v1"
opsconstants "github.com/shaowenchen/ops/pkg/constants"
opsevent "github.com/shaowenchen/ops/pkg/event"
opshost "github.com/shaowenchen/ops/pkg/host"
opslog "github.com/shaowenchen/ops/pkg/log"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -183,6 +184,20 @@ func (r *HostReconciler) updateStatus(logger *opslog.Logger, ctx context.Context
logger.Error.Println(err, "failed to get host status")
}
err = r.commitStatus(logger, ctx, h, status, "")
// push event
go func() {
ebus, err := opsevent.NewEventBus().Builder()
if err != nil {
logger.Error.Println(err, "failed to create event bus for host")
return
}
ebus.Publish(ctx, opsevent.EventHost{
Address: h.Spec.Address,
Port: h.Spec.Port,
Username: h.Spec.Username,
HostStatus: *status,
})
}()
return
}

Expand Down
15 changes: 15 additions & 0 deletions controllers/pipelinerun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
opsv1 "github.com/shaowenchen/ops/api/v1"
opsconstants "github.com/shaowenchen/ops/pkg/constants"
opslog "github.com/shaowenchen/ops/pkg/log"
opsevent "github.com/shaowenchen/ops/pkg/event"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -239,6 +240,20 @@ func (r *PipelineRunReconciler) run(logger *opslog.Logger, ctx context.Context,
}
}
r.commitStatus(logger, ctx, pr, finallyStatus, "", "", nil)
// push event
go func() {
ebus, err := opsevent.NewEventBus().Builder()
if err != nil {
logger.Error.Println(err, "failed to create event bus for pipelinerun")
return
}
ebus.Publish(ctx, opsevent.EventPipelineRun{
Ref: pr.Spec.Ref,
Desc: pr.Spec.Desc,
Variables: pr.Spec.Variables,
PipelineRunStatus: pr.Status,
})
}()
return
}

Expand Down
21 changes: 18 additions & 3 deletions controllers/taskrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cron "github.com/robfig/cron/v3"
opsv1 "github.com/shaowenchen/ops/api/v1"
opsconstants "github.com/shaowenchen/ops/pkg/constants"
opsevent "github.com/shaowenchen/ops/pkg/event"
opshost "github.com/shaowenchen/ops/pkg/host"
opskube "github.com/shaowenchen/ops/pkg/kube"
opslog "github.com/shaowenchen/ops/pkg/log"
Expand Down Expand Up @@ -285,13 +286,27 @@ func (r *TaskRunReconciler) run(logger *opslog.Logger, ctx context.Context, t *o
cliLogger.Flush()
}
// get taskrun status
finallyStatus := opsv1.StatusSuccessed
for _, node := range tr.Status.TaskRunNodeStatus {
if node.RunStatus != opsv1.StatusSuccessed {
r.commitStatus(logger, ctx, tr, opsv1.StatusFailed)
return
finallyStatus = opsv1.StatusFailed
}
}
r.commitStatus(logger, ctx, tr, opsv1.StatusSuccessed)
r.commitStatus(logger, ctx, tr, finallyStatus)
// push event
go func() {
ebus, err := opsevent.NewEventBus().Builder()
if err != nil {
logger.Error.Println(err, "failed to create event bus for pipelinerun")
return
}
ebus.Publish(ctx, opsevent.EventTaskRun{
Ref: tr.Spec.Ref,
Desc: "",
Variables: tr.Spec.Variables,
TaskRunStatus: tr.Status,
})
}()
return
}

Expand Down
102 changes: 96 additions & 6 deletions pkg/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
opsv1 "github.com/shaowenchen/ops/api/v1"
)

type EventBus struct {
Expand All @@ -13,7 +14,16 @@ type EventBus struct {
Subject string
}

func NewEventBus(natsServer string, subject string) (*EventBus, error) {
const DefaultNatsServer = "nats://nats-headless:4222"
const DefaultSubject = "ops.event"

func NewEventBus() *EventBus {
return &EventBus{}
}

func (bus *EventBus) Builder() (*EventBus, error) {
natsServer := DefaultNatsServer
subject := DefaultSubject
p, err := cenats.NewSender(natsServer, subject, cenats.NatsOptions())
if err != nil {
return nil, err
Expand All @@ -25,19 +35,99 @@ func NewEventBus(natsServer string, subject string) (*EventBus, error) {
if err != nil {
return nil, err
}
return &EventBus{client: c, NatsServer: natsServer, Subject: subject}, nil
if bus == nil {
return &EventBus{
client: c,
NatsServer: natsServer,
Subject: subject,
}, nil
} else {
bus.client = c
bus.NatsServer = natsServer
bus.Subject = subject
}
return bus, nil
}

func (bus *EventBus) Publish(ctx context.Context, event cloudevents.Event) error {
func (bus *EventBus) publishCloudEvent(ctx context.Context, event cloudevents.Event) error {
result := bus.client.Send(ctx, event)
if cloudevents.IsUndelivered(result) {
return errors.New("failed to publish")
}
return nil
}

func (bus *EventBus) Subscribe(ctx context.Context, handler func(ctx context.Context, event cloudevents.Event)) error {
for {
bus.client.StartReceiver(ctx, handler)
func (bus *EventBus) Publish(ctx context.Context, data interface{}) error {
event, err := builderEvent(data)
if err != nil {
return err
}
return bus.publishCloudEvent(ctx, event)
}

func (bus *EventBus) getCloudEvent(ctx context.Context, eventType string) (*cloudevents.Event, error) {
var receivedEvent *cloudevents.Event
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := bus.client.StartReceiver(ctx, func(event cloudevents.Event) cloudevents.Result {
if event.Type() == eventType {
receivedEvent = &event
cancel()
return cloudevents.ResultACK
}
return cloudevents.ResultNACK
})
return receivedEvent, err
}

func (bus *EventBus) GetHost(ctx context.Context) (*EventHost, error) {
event, err := bus.getCloudEvent(ctx, opsv1.HostKind)
if err != nil {
return nil, err
}
var data EventHost
err = event.DataAs(&data)
if err != nil {
return nil, err
}
return &data, nil
}

func (bus *EventBus) GetCluster(ctx context.Context) (*EventCluster, error) {
event, err := bus.getCloudEvent(ctx, opsv1.ClusterKind)
if err != nil {
return nil, err
}
var data EventCluster
err = event.DataAs(&data)
if err != nil {
return nil, err
}
return &data, nil
}

func (bus *EventBus) GetTaskRun(ctx context.Context) (*EventTaskRun, error) {
event, err := bus.getCloudEvent(ctx, opsv1.TaskRunKind)
if err != nil {
return nil, err
}
var data EventTaskRun
err = event.DataAs(&data)
if err != nil {
return nil, err
}
return &data, nil
}

func (bus *EventBus) GetPipelineRun(ctx context.Context) (*EventPipelineRun, error) {
event, err := bus.getCloudEvent(ctx, opsv1.PipelineRunKind)
if err != nil {
return nil, err
}
var data EventPipelineRun
err = event.DataAs(&data)
if err != nil {
return nil, err
}
return &data, nil
}
55 changes: 30 additions & 25 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,53 @@ import (
"fmt"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
opsv1 "github.com/shaowenchen/ops/api/v1"
)

type EventPipelineRun struct {
Ref string `json:"ref"`
Desc string `json:"desc"`
Variables string `json:"variables"`
type EventHost struct {
Address string `json:"address,omitempty" yaml:"address,omitempty"`
Port int `json:"port,omitempty" yaml:"port,omitempty"`
Username string `json:"username,omitempty" yaml:"username,omitempty"`
opsv1.HostStatus
}

type EventCluster struct {
Server string `json:"server,omitempty" yaml:"server,omitempty" `
opsv1.ClusterStatus
}
type EventTaskRun struct {
Ref string `json:"ref"`
Desc string `json:"desc"`
Variables string `json:"variables"`
Ref string `json:"ref"`
Desc string `json:"desc"`
Variables map[string]string `json:"variables"`
opsv1.TaskRunStatus
}

type EventInspection struct {
TypeRef string `json:"typeRef"`
NameRef string `json:"nameRef"`
NodeName string `json:"nodeName"`
Variables string `json:"variables"`
ThresholdValue string `json:"thresholdValue"`
Comparator string `json:"comparator"`
CurrentValue string `json:"currentValue"`
Status string `json:"status"`
Priority string `json:"priority"`
type EventPipelineRun struct {
Ref string `json:"ref"`
Desc string `json:"desc"`
Variables map[string]string `json:"variables"`
opsv1.PipelineRunStatus
}

func BuilderEvent(data interface{}) (cloudevents.Event, error) {
func builderEvent(data interface{}) (cloudevents.Event, error) {
e := cloudevents.NewEvent()
e.SetID(uuid.New().String())
e.SetSource("https://www.chenshaowen.com/ops/")
e.SetSource(opsv1.APIVersion)

var eventType string
switch v := data.(type) {
case EventInspection:
eventType = "ops.inspection"
case *EventInspection:
eventType = "ops.inspection"
case *EventHost, EventHost:
eventType = opsv1.TaskKind
case *EventCluster, EventCluster:
eventType = opsv1.ClusterKind
case *EventTaskRun, EventTaskRun:
eventType = opsv1.TaskRunKind
case *EventPipelineRun, EventPipelineRun:
eventType = opsv1.PipelineRunKind
default:
eventType = "ops.unknown"
return e, fmt.Errorf("unsupported data type: %T", v)
}
e.SetType(eventType)
err := e.SetData("application/json", data)
err := e.SetData(cloudevents.ApplicationJSON, data)
return e, err
}

0 comments on commit b5092d1

Please sign in to comment.