diff --git a/apps/workspace-engine/pkg/events/common.go b/apps/workspace-engine/pkg/events/common.go new file mode 100644 index 000000000..8a227ebae --- /dev/null +++ b/apps/workspace-engine/pkg/events/common.go @@ -0,0 +1,19 @@ +package events + +import ( + "encoding/json" + "fmt" +) + +func parsePayload(payload any, target any) error { + payloadBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + if err := json.Unmarshal(payloadBytes, target); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } + + return nil +} diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go new file mode 100644 index 000000000..09dcaa620 --- /dev/null +++ b/apps/workspace-engine/pkg/events/events.go @@ -0,0 +1,66 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + + "workspace-engine/pkg/logger" + + "github.com/charmbracelet/log" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +type EventType string + +const ( + ResourceCreated EventType = "resource.created" + ResourceUpdated EventType = "resource.updated" + ResourceDeleted EventType = "resource.deleted" +) + +type BaseEvent struct { + EventType EventType `json:"eventType"` + WorkspaceID string `json:"workspaceId"` + EventID string `json:"eventId"` + Timestamp float64 `json:"timestamp"` + Source string `json:"source"` +} + +type RawEvent struct { + BaseEvent + Payload any `json:"payload,omitempty"` +} + +type EventHandler func(ctx context.Context, event RawEvent) error +type EventHandlerRegistry map[EventType]EventHandler + +type MessageReader struct { + handlers EventHandlerRegistry + log *log.Logger +} + +func NewMessageReader() *MessageReader { + return &MessageReader{ + handlers: EventHandlerRegistry{ + ResourceCreated: handleResourceCreatedEvent, + ResourceUpdated: handleResourceUpdatedEvent, + ResourceDeleted: handleResourceDeletedEvent, + }, + log: logger.Get(), + } +} + +func (p *MessageReader) ReadMessage(ctx context.Context, msg *kafka.Message) error { + var rawEvent RawEvent + if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { + return fmt.Errorf("failed to unmarshal event: %w", err) + } + + handler, ok := p.handlers[rawEvent.EventType] + if !ok { + return fmt.Errorf("no handler found for event type: %s", rawEvent.EventType) + } + + return handler(ctx, rawEvent) +} diff --git a/apps/workspace-engine/pkg/events/resource.go b/apps/workspace-engine/pkg/events/resource.go new file mode 100644 index 000000000..ae9272a9a --- /dev/null +++ b/apps/workspace-engine/pkg/events/resource.go @@ -0,0 +1,86 @@ +package events + +import ( + "context" + "fmt" + "time" + "workspace-engine/pkg/logger" +) + +type Resource struct { + ID string `json:"id"` + WorkspaceID string `json:"workspaceId"` + Name string `json:"name"` + Identifier string `json:"identifier"` + Kind string `json:"kind"` + Version string `json:"version"` + Config map[string]interface{} `json:"config"` + Metadata map[string]string `json:"metadata"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` + ProviderID *string `json:"providerId,omitempty"` +} + +type ResourceCreatedEvent struct { + BaseEvent + Payload Resource `json:"payload"` +} + +func handleResourceCreatedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource created event", "event", event) + + var resourceCreatedEvent ResourceCreatedEvent + if err := parsePayload(event.Payload, &resourceCreatedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource created event", "event", resourceCreatedEvent) + + return nil +} + +type ResourceUpdatedEvent struct { + BaseEvent + Payload struct { + Current Resource `json:"current"` + Previous Resource `json:"previous"` + } +} + +func handleResourceUpdatedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource updated event", "event", event) + + var resourceUpdatedEvent ResourceUpdatedEvent + if err := parsePayload(event.Payload, &resourceUpdatedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource updated event", "event", resourceUpdatedEvent) + + return nil +} + +type ResourceDeletedEvent struct { + BaseEvent + Payload Resource `json:"payload"` +} + +func handleResourceDeletedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource deleted event", "event", event) + + var resourceDeletedEvent ResourceDeletedEvent + if err := parsePayload(event.Payload, &resourceDeletedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource deleted event", "event", resourceDeletedEvent) + + return nil +} diff --git a/apps/workspace-engine/pkg/kafka/consumer.go b/apps/workspace-engine/pkg/kafka/consumer.go index 53193dbdf..5195da8b9 100644 --- a/apps/workspace-engine/pkg/kafka/consumer.go +++ b/apps/workspace-engine/pkg/kafka/consumer.go @@ -2,10 +2,10 @@ package kafka import ( "context" - "encoding/json" "os" "time" + "workspace-engine/pkg/events" "workspace-engine/pkg/logger" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -16,16 +16,6 @@ const ( Topic = "ctrlplane-events" ) -// Event represents the structure of events from your TypeScript code -type Event struct { - WorkspaceID string `json:"workspaceId"` - EventType string `json:"eventType"` - EventID string `json:"eventId"` - Timestamp float64 `json:"timestamp"` - Source string `json:"source"` - Payload map[string]interface{} `json:"payload"` -} - func RunConsumer(ctx context.Context) error { log := logger.Get() @@ -56,6 +46,8 @@ func RunConsumer(ctx context.Context) error { log.Info("Started Kafka consumer for ctrlplane-events") + reader := events.NewMessageReader() + for { select { case <-ctx.Done(): @@ -74,15 +66,11 @@ func RunConsumer(ctx context.Context) error { continue } - var event Event - err = json.Unmarshal(msg.Value, &event) - if err != nil { - log.Error("Failed to unmarshal event", "error", err) + if err := reader.ReadMessage(ctx, msg); err != nil { + log.Error("Failed to read message", "error", err) continue } - log.Info("Received event", "event", event) - // NOTE: we do not commit the message. if the process ends and we need to rebuild the state, // we need to start from the beginning of the topic. }