diff --git a/.k8s/deployment.yaml b/.k8s/deployment.yaml new file mode 100644 index 0000000..2df0488 --- /dev/null +++ b/.k8s/deployment.yaml @@ -0,0 +1,71 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: secap-input + namespace: secap-compass +spec: + replicas: 1 + selector: + matchLabels: + app: secap-input + template: + metadata: + labels: + app: secap-input + spec: + containers: + - name: secap-input + image: secap-input:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8001 + name: http + protocol: TCP + env: + - name: EVENTSTORE_CONNECTION_STRING + value: esdb://eventstore-db.persistence:2113 + resources: + limits: + cpu: "200m" + memory: "1000Mi" + requests: + cpu: "50m" + memory: "50Mi" +--- +apiVersion: v1 +kind: Service +metadata: + name: secap-input + namespace: secap-compass +spec: + selector: + app: secap-input + ports: + - protocol: TCP + name: http + port: 8001 + targetPort: 8001 + + type: ClusterIP +--- +# Existing Deployment and Service definitions + +#--- +#apiVersion: autoscaling/v2 +#kind: HorizontalPodAutoscaler +#metadata: +# name: secap-input +#spec: +# scaleTargetRef: +# apiVersion: apps/v1 +# kind: Deployment +# name: secap-input +# minReplicas: 1 +# maxReplicas: 10 +# metrics: +# - type: Resource +# resource: +# name: cpu +# target: +# type: Utilization +# averageUtilization: 80 \ No newline at end of file diff --git a/cmd/api/main.go b/cmd/api/main.go index a9859fd..41c591f 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/gofiber/fiber/v2/middleware/pprof" "os" "secap-input/internal/server" "strconv" @@ -12,9 +13,15 @@ import ( func main() { server := server.New() + server.Use(pprof.New()) server.RegisterFiberRoutes() - port, _ := strconv.Atoi(os.Getenv("PORT")) + p := os.Getenv("PORT") + if p == "" { + p = "8001" + } + + port, _ := strconv.Atoi(p) err := server.Listen(fmt.Sprintf(":%d", port)) if err != nil { panic(fmt.Sprintf("cannot start server: %s", err)) diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..994c159 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,9 @@ +kubectl apply -f eventstoredb.deployment.yaml -n persistence + +docker image rm secap-input:latest +docker build -t secap-input:latest . + +minikube image rm secap-input:latest +minikube image load secap-input:latest + +kubectl apply -f .k8s/deployment.yaml -n secap-compass \ No newline at end of file diff --git a/eventstoredb.deployment.yaml b/eventstoredb.deployment.yaml new file mode 100644 index 0000000..f01e138 --- /dev/null +++ b/eventstoredb.deployment.yaml @@ -0,0 +1,73 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: eventstore-db +spec: + serviceName: eventstore-db + replicas: 1 + selector: + matchLabels: + app: eventstore-db + template: + metadata: + labels: + app: eventstore-db + spec: + containers: + - name: eventstore-db + image: eventstore/eventstore:lts + env: + - name: EVENTSTORE_CLUSTER_SIZE + value: "1" + - name: EVENTSTORE_RUN_PROJECTIONS + value: "All" + - name: EVENTSTORE_START_STANDARD_PROJECTIONS + value: "true" + - name: EVENTSTORE_HTTP_PORT + value: "2113" + - name: EVENTSTORE_INSECURE + value: "true" + - name: EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP + value: "true" + - name: EVENTSTORE_ALLOW_UNKNOWN_OPTIONS + value: "true" + ports: + - containerPort: 2113 + name: http + protocol: TCP + - containerPort: 1113 + name: grpc + protocol: TCP + volumeMounts: + - mountPath: /var/lib/eventstore + name: eventstore-volume-data + - mountPath: /var/log/eventstore + name: eventstore-volume-logs + volumeClaimTemplates: + - metadata: + name: eventstore-volume-data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 10Gi + - metadata: + name: eventstore-volume-logs + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: eventstore-db +spec: + selector: + app: eventstore-db + ports: + - protocol: TCP + port: 2113 + targetPort: 2113 + type: ClusterIP diff --git a/go.mod b/go.mod index 68b0ff6..c4f60d7 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,10 @@ go 1.22.1 require ( github.com/EventStore/EventStore-Client-Go/v4 v4.0.0 github.com/gofiber/fiber/v2 v2.52.4 - github.com/gofrs/uuid v4.4.0+incompatible github.com/google/uuid v1.6.0 github.com/joho/godotenv v1.5.1 github.com/json-iterator/go v1.1.12 github.com/pkg/errors v0.9.1 - golang.org/x/sync v0.6.0 ) require ( diff --git a/internal/common/esdb/esdb.go b/internal/common/esdb/esdb.go index 93c30fd..a428135 100644 --- a/internal/common/esdb/esdb.go +++ b/internal/common/esdb/esdb.go @@ -4,12 +4,24 @@ import ( "context" "github.com/EventStore/EventStore-Client-Go/v4/esdb" "github.com/gofiber/fiber/v2/log" + "log/slog" + "os" + "os/exec" ) type SubscriptionWorker func(ctx context.Context, stream *esdb.PersistentSubscription, workerID int) error func ConnectESDB() *esdb.Client { - cfg, err := esdb.ParseConnectionString("esdb://localhost:2113?tls=false") + + out, _ := exec.Command("ls").Output() + slog.Info(string(out)) + out, _ = exec.Command("pwd").Output() + slog.Info(string(out)) + + connectionString := os.Getenv("EVENTSTORE_CONNECTION_STRING") + slog.Info("", slog.String("Eventstoredb connection string", connectionString)) + + cfg, err := esdb.ParseConnectionString(connectionString + "?tls=false") if err != nil { log.Fatal(err) } diff --git a/internal/common/infrastructure/repository/aggregate.go b/internal/common/infrastructure/repository/aggregate.go index bcd34a9..35d6c58 100644 --- a/internal/common/infrastructure/repository/aggregate.go +++ b/internal/common/infrastructure/repository/aggregate.go @@ -99,7 +99,8 @@ func (r *AggregateRepository) Save(ctx context.Context, a eventsourcing.Aggregat func (r *AggregateRepository) Exists(ctx context.Context, aggregateId uuid.UUID) bool { readStreamOptions := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.Revision(1)} // error nil means stream exists? - _, err := r.db.ReadStream(ctx, aggregateId.String(), readStreamOptions, 1) + stream, err := r.db.ReadStream(ctx, aggregateId.String(), readStreamOptions, 1) + defer stream.Close() if err == nil { return false diff --git a/internal/domain/building/core/events/building_measured_event.go b/internal/domain/building/core/events/building_measured_event.go index 52b4b40..33d6c08 100644 --- a/internal/domain/building/core/events/building_measured_event.go +++ b/internal/domain/building/core/events/building_measured_event.go @@ -2,6 +2,7 @@ package events import ( "context" + "github.com/iancoleman/strcase" "secap-input/internal/common/eventsourcing" "secap-input/internal/domain/building/core/model" ) @@ -13,8 +14,8 @@ type BuildingMeasuredEvent struct { const BuildingMeasuredEventType = "building.measured" func NewBuildingMeasuredEvent(ctx context.Context, a *eventsourcing.AggregateBase, measurement *model.Measurement) (*eventsourcing.Event, error) { - //measurement.MeasurementTypeHeader = model.MeasurementTypeHeader(strcase.ToCamel(string(measurement.MeasurementTypeHeader))) - //measurement.MeasurementType = model.MeasurementType(strcase.ToCamel(string(measurement.MeasurementType))) + measurement.MeasurementTypeHeader = model.MeasurementTypeHeader(strcase.ToCamel(string(measurement.MeasurementTypeHeader))) + measurement.MeasurementType = model.MeasurementType(strcase.ToCamel(string(measurement.MeasurementType))) eventData := &BuildingMeasuredEvent{ Measurement: measurement, } diff --git a/internal/domain/building/core/model/measurement.go b/internal/domain/building/core/model/measurement.go index 680a11c..bc70826 100644 --- a/internal/domain/building/core/model/measurement.go +++ b/internal/domain/building/core/model/measurement.go @@ -1,21 +1,26 @@ package model -import "fmt" +import ( + "fmt" + "time" +) type Measurement struct { Unit string `json:"unit"` Value float64 `json:"value"` MeasurementTypeHeader MeasurementTypeHeader `json:"measurementTypeHeader"` MeasurementType MeasurementType `json:"measurementType"` + Timestamp time.Time `json:"timestamp"` } -func NewMeasurement(unit string, value float64, mt string, mth string) (*Measurement, error) { +func NewMeasurement(unit string, value float64, mt string, mth string, t time.Time) (*Measurement, error) { return &Measurement{ Unit: unit, Value: value, MeasurementType: MeasurementType(mt), MeasurementTypeHeader: MeasurementTypeHeader(mth), + Timestamp: t, }, nil } diff --git a/internal/domain/building/infrastructure/measurement_type_provider.go b/internal/domain/building/infrastructure/measurement_type_provider.go index af80388..5a9136b 100644 --- a/internal/domain/building/infrastructure/measurement_type_provider.go +++ b/internal/domain/building/infrastructure/measurement_type_provider.go @@ -22,7 +22,7 @@ func NewMeasurementTypeProvider() ports.MeasurementTypeProvider { } func (m *measurementTypeProvider) fetchConfig() { - file, err := os.ReadFile("config/building_measurement_types/en.json") // Can this be multi-language? + file, err := os.ReadFile("./config/building_measurement_types/en.json") // Can this be multi-language? if err != nil { panic(err) } diff --git a/internal/server/request/MeasureBuildingRequest.go b/internal/server/request/MeasureBuildingRequest.go index 8e4372a..26730e1 100644 --- a/internal/server/request/MeasureBuildingRequest.go +++ b/internal/server/request/MeasureBuildingRequest.go @@ -1,18 +1,22 @@ package request -import "secap-input/internal/domain/building/core/model" +import ( + "secap-input/internal/domain/building/core/model" + "time" +) type MeasureBuildingRequest struct { Measurements []MeasurementDTO `json:"measurements"` } type MeasurementDTO struct { - Value float64 `json:"amount"` - Unit string `json:"unit"` - Type string `json:"type"` - TypeHeader string `json:"typeHeader"` + Value float64 `json:"value"` + Unit string `json:"unit"` + Type string `json:"measurementType"` + TypeHeader string `json:"measurementTypeHeader"` + Timestamp time.Time `json:"timestamp"` } func (m *MeasurementDTO) ToModel() (*model.Measurement, error) { - return model.NewMeasurement(m.Unit, m.Value, m.Type, m.TypeHeader) + return model.NewMeasurement(m.Unit, m.Value, m.Type, m.TypeHeader, m.Timestamp) } diff --git a/internal/server/routes.go b/internal/server/routes.go index d331b27..63bf7a8 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -87,7 +87,7 @@ func (s *FiberServer) measureBuilding(c *fiber.Ctx) error { measurements := make([]*model.Measurement, 0, len(r.Measurements)) for _, measurement := range r.Measurements { - m, err := model.NewMeasurement(measurement.Unit, measurement.Value, measurement.Type, measurement.TypeHeader) + m, err := measurement.ToModel() if err != nil { return c.Status(400).JSON(fiber.Map{ "error": err.Error(),