diff --git a/api/v1/common.go b/api/v1/common.go index ce01d32a..64b1aeae 100644 --- a/api/v1/common.go +++ b/api/v1/common.go @@ -7,6 +7,8 @@ import ( const APIVersion = "crd.chenshaowen.com/v1" const ( + HostKind = "Host" + ClusterKind = "Cluster" TaskKind = "Task" TaskRunKind = "TaskRun" PipelineKind = "Pipeline" diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 887593ce..cd5dc607 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -24,6 +24,7 @@ import ( opsv1 "github.com/shaowenchen/ops/api/v1" opsconstants "github.com/shaowenchen/ops/pkg/constants" + opsevent "github.com/shaowenchen/ops/pkg/event" opskube "github.com/shaowenchen/ops/pkg/kube" opslog "github.com/shaowenchen/ops/pkg/log" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -157,6 +158,18 @@ func (r *ClusterReconciler) updateStatus(logger *opslog.Logger, ctx context.Cont logger.Error.Println(err, "failed to get cluster status") } err = r.commitStatus(logger, ctx, c, status, "") + // push event + go func() { + ebus, err := opsevent.NewEventBus().Builder() + if err != nil { + logger.Error.Println(err, "failed to create event bus for cluster") + return + } + ebus.Publish(ctx, opsevent.EventCluster{ + Server: c.Spec.Server, + ClusterStatus: *status, + }) + }() return } diff --git a/controllers/host_controller.go b/controllers/host_controller.go index 18db8409..38b0b471 100644 --- a/controllers/host_controller.go +++ b/controllers/host_controller.go @@ -22,6 +22,7 @@ import ( "fmt" opsv1 "github.com/shaowenchen/ops/api/v1" opsconstants "github.com/shaowenchen/ops/pkg/constants" + opsevent "github.com/shaowenchen/ops/pkg/event" opshost "github.com/shaowenchen/ops/pkg/host" opslog "github.com/shaowenchen/ops/pkg/log" corev1 "k8s.io/api/core/v1" @@ -183,6 +184,20 @@ func (r *HostReconciler) updateStatus(logger *opslog.Logger, ctx context.Context logger.Error.Println(err, "failed to get host status") } err = r.commitStatus(logger, ctx, h, status, "") + // push event + go func() { + ebus, err := opsevent.NewEventBus().Builder() + if err != nil { + logger.Error.Println(err, "failed to create event bus for host") + return + } + ebus.Publish(ctx, opsevent.EventHost{ + Address: h.Spec.Address, + Port: h.Spec.Port, + Username: h.Spec.Username, + HostStatus: *status, + }) + }() return } diff --git a/controllers/pipelinerun_controller.go b/controllers/pipelinerun_controller.go index 15204386..670170b2 100644 --- a/controllers/pipelinerun_controller.go +++ b/controllers/pipelinerun_controller.go @@ -29,6 +29,7 @@ import ( opsv1 "github.com/shaowenchen/ops/api/v1" opsconstants "github.com/shaowenchen/ops/pkg/constants" opslog "github.com/shaowenchen/ops/pkg/log" + opsevent "github.com/shaowenchen/ops/pkg/event" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -239,6 +240,20 @@ func (r *PipelineRunReconciler) run(logger *opslog.Logger, ctx context.Context, } } r.commitStatus(logger, ctx, pr, finallyStatus, "", "", nil) + // push event + go func() { + ebus, err := opsevent.NewEventBus().Builder() + if err != nil { + logger.Error.Println(err, "failed to create event bus for pipelinerun") + return + } + ebus.Publish(ctx, opsevent.EventPipelineRun{ + Ref: pr.Spec.Ref, + Desc: pr.Spec.Desc, + Variables: pr.Spec.Variables, + PipelineRunStatus: pr.Status, + }) + }() return } diff --git a/controllers/taskrun_controller.go b/controllers/taskrun_controller.go index 9caeb84a..06f5e286 100644 --- a/controllers/taskrun_controller.go +++ b/controllers/taskrun_controller.go @@ -29,6 +29,7 @@ import ( cron "github.com/robfig/cron/v3" opsv1 "github.com/shaowenchen/ops/api/v1" opsconstants "github.com/shaowenchen/ops/pkg/constants" + opsevent "github.com/shaowenchen/ops/pkg/event" opshost "github.com/shaowenchen/ops/pkg/host" opskube "github.com/shaowenchen/ops/pkg/kube" opslog "github.com/shaowenchen/ops/pkg/log" @@ -285,13 +286,27 @@ func (r *TaskRunReconciler) run(logger *opslog.Logger, ctx context.Context, t *o cliLogger.Flush() } // get taskrun status + finallyStatus := opsv1.StatusSuccessed for _, node := range tr.Status.TaskRunNodeStatus { if node.RunStatus != opsv1.StatusSuccessed { - r.commitStatus(logger, ctx, tr, opsv1.StatusFailed) - return + finallyStatus = opsv1.StatusFailed } } - r.commitStatus(logger, ctx, tr, opsv1.StatusSuccessed) + r.commitStatus(logger, ctx, tr, finallyStatus) + // push event + go func() { + ebus, err := opsevent.NewEventBus().Builder() + if err != nil { + logger.Error.Println(err, "failed to create event bus for pipelinerun") + return + } + ebus.Publish(ctx, opsevent.EventTaskRun{ + Ref: tr.Spec.Ref, + Desc: "", + Variables: tr.Spec.Variables, + TaskRunStatus: tr.Status, + }) + }() return } diff --git a/pkg/event/bus.go b/pkg/event/bus.go index 1612ca97..df19d31d 100644 --- a/pkg/event/bus.go +++ b/pkg/event/bus.go @@ -5,6 +5,7 @@ import ( "errors" cenats "github.com/cloudevents/sdk-go/protocol/nats/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + opsv1 "github.com/shaowenchen/ops/api/v1" ) type EventBus struct { @@ -13,7 +14,16 @@ type EventBus struct { Subject string } -func NewEventBus(natsServer string, subject string) (*EventBus, error) { +const DefaultNatsServer = "nats://nats-headless:4222" +const DefaultSubject = "ops.event" + +func NewEventBus() *EventBus { + return &EventBus{} +} + +func (bus *EventBus) Builder() (*EventBus, error) { + natsServer := DefaultNatsServer + subject := DefaultSubject p, err := cenats.NewSender(natsServer, subject, cenats.NatsOptions()) if err != nil { return nil, err @@ -25,10 +35,21 @@ func NewEventBus(natsServer string, subject string) (*EventBus, error) { if err != nil { return nil, err } - return &EventBus{client: c, NatsServer: natsServer, Subject: subject}, nil + if bus == nil { + return &EventBus{ + client: c, + NatsServer: natsServer, + Subject: subject, + }, nil + } else { + bus.client = c + bus.NatsServer = natsServer + bus.Subject = subject + } + return bus, nil } -func (bus *EventBus) Publish(ctx context.Context, event cloudevents.Event) error { +func (bus *EventBus) publishCloudEvent(ctx context.Context, event cloudevents.Event) error { result := bus.client.Send(ctx, event) if cloudevents.IsUndelivered(result) { return errors.New("failed to publish") @@ -36,8 +57,77 @@ func (bus *EventBus) Publish(ctx context.Context, event cloudevents.Event) error return nil } -func (bus *EventBus) Subscribe(ctx context.Context, handler func(ctx context.Context, event cloudevents.Event)) error { - for { - bus.client.StartReceiver(ctx, handler) +func (bus *EventBus) Publish(ctx context.Context, data interface{}) error { + event, err := builderEvent(data) + if err != nil { + return err + } + return bus.publishCloudEvent(ctx, event) +} + +func (bus *EventBus) getCloudEvent(ctx context.Context, eventType string) (*cloudevents.Event, error) { + var receivedEvent *cloudevents.Event + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := bus.client.StartReceiver(ctx, func(event cloudevents.Event) cloudevents.Result { + if event.Type() == eventType { + receivedEvent = &event + cancel() + return cloudevents.ResultACK + } + return cloudevents.ResultNACK + }) + return receivedEvent, err +} + +func (bus *EventBus) GetHost(ctx context.Context) (*EventHost, error) { + event, err := bus.getCloudEvent(ctx, opsv1.HostKind) + if err != nil { + return nil, err + } + var data EventHost + err = event.DataAs(&data) + if err != nil { + return nil, err + } + return &data, nil +} + +func (bus *EventBus) GetCluster(ctx context.Context) (*EventCluster, error) { + event, err := bus.getCloudEvent(ctx, opsv1.ClusterKind) + if err != nil { + return nil, err + } + var data EventCluster + err = event.DataAs(&data) + if err != nil { + return nil, err + } + return &data, nil +} + +func (bus *EventBus) GetTaskRun(ctx context.Context) (*EventTaskRun, error) { + event, err := bus.getCloudEvent(ctx, opsv1.TaskRunKind) + if err != nil { + return nil, err + } + var data EventTaskRun + err = event.DataAs(&data) + if err != nil { + return nil, err + } + return &data, nil +} + +func (bus *EventBus) GetPipelineRun(ctx context.Context) (*EventPipelineRun, error) { + event, err := bus.getCloudEvent(ctx, opsv1.PipelineRunKind) + if err != nil { + return nil, err + } + var data EventPipelineRun + err = event.DataAs(&data) + if err != nil { + return nil, err } + return &data, nil } diff --git a/pkg/event/event.go b/pkg/event/event.go index fc6e93af..7f70b874 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -4,48 +4,53 @@ import ( "fmt" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" + opsv1 "github.com/shaowenchen/ops/api/v1" ) -type EventPipelineRun struct { - Ref string `json:"ref"` - Desc string `json:"desc"` - Variables string `json:"variables"` +type EventHost struct { + Address string `json:"address,omitempty" yaml:"address,omitempty"` + Port int `json:"port,omitempty" yaml:"port,omitempty"` + Username string `json:"username,omitempty" yaml:"username,omitempty"` + opsv1.HostStatus } +type EventCluster struct { + Server string `json:"server,omitempty" yaml:"server,omitempty" ` + opsv1.ClusterStatus +} type EventTaskRun struct { - Ref string `json:"ref"` - Desc string `json:"desc"` - Variables string `json:"variables"` + Ref string `json:"ref"` + Desc string `json:"desc"` + Variables map[string]string `json:"variables"` + opsv1.TaskRunStatus } -type EventInspection struct { - TypeRef string `json:"typeRef"` - NameRef string `json:"nameRef"` - NodeName string `json:"nodeName"` - Variables string `json:"variables"` - ThresholdValue string `json:"thresholdValue"` - Comparator string `json:"comparator"` - CurrentValue string `json:"currentValue"` - Status string `json:"status"` - Priority string `json:"priority"` +type EventPipelineRun struct { + Ref string `json:"ref"` + Desc string `json:"desc"` + Variables map[string]string `json:"variables"` + opsv1.PipelineRunStatus } -func BuilderEvent(data interface{}) (cloudevents.Event, error) { +func builderEvent(data interface{}) (cloudevents.Event, error) { e := cloudevents.NewEvent() e.SetID(uuid.New().String()) - e.SetSource("https://www.chenshaowen.com/ops/") + e.SetSource(opsv1.APIVersion) var eventType string switch v := data.(type) { - case EventInspection: - eventType = "ops.inspection" - case *EventInspection: - eventType = "ops.inspection" + case *EventHost, EventHost: + eventType = opsv1.TaskKind + case *EventCluster, EventCluster: + eventType = opsv1.ClusterKind + case *EventTaskRun, EventTaskRun: + eventType = opsv1.TaskRunKind + case *EventPipelineRun, EventPipelineRun: + eventType = opsv1.PipelineRunKind default: - eventType = "ops.unknown" return e, fmt.Errorf("unsupported data type: %T", v) } e.SetType(eventType) - err := e.SetData("application/json", data) + err := e.SetData(cloudevents.ApplicationJSON, data) return e, err }