From 18960da78ae3d458c0f40c59262d75150e63e02d Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Mon, 4 Aug 2025 14:52:28 -0700 Subject: [PATCH 1/3] init --- apps/workspace-engine/pkg/events/events.go | 21 ++++++++++++ apps/workspace-engine/pkg/events/resource.go | 36 ++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 apps/workspace-engine/pkg/events/events.go create mode 100644 apps/workspace-engine/pkg/events/resource.go diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go new file mode 100644 index 000000000..2ebc65d76 --- /dev/null +++ b/apps/workspace-engine/pkg/events/events.go @@ -0,0 +1,21 @@ +package events + +type EventType string + +const ( + ResourceCreated EventType = "resource.created" + ResourceUpdated EventType = "resource.updated" +) + +type BaseEvent struct { + EventType EventType `json:"eventType"` + WorkspaceID string `json:"workspaceId"` + EventID string `json:"eventId"` + Timestamp float64 `json:"timestamp"` + Source string `json:"source"` +} + +type RawEvent struct { + BaseEvent + Payload map[string]any `json:"payload,omitempty"` +} diff --git a/apps/workspace-engine/pkg/events/resource.go b/apps/workspace-engine/pkg/events/resource.go new file mode 100644 index 000000000..b55937aca --- /dev/null +++ b/apps/workspace-engine/pkg/events/resource.go @@ -0,0 +1,36 @@ +package events + +import "time" + +type Resource struct { + ID string `json:"id"` + WorkspaceID string `json:"workspaceId"` + Name string `json:"name"` + Identifier string `json:"identifier"` + Kind string `json:"kind"` + Version string `json:"version"` + Config map[string]interface{} `json:"config"` + Metadata map[string]string `json:"metadata"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` + ProviderID *string `json:"providerId,omitempty"` +} + +type ResourceCreatedEvent struct { + BaseEvent + Payload Resource `json:"payload"` +} + +type ResourceUpdatedEvent struct { + BaseEvent + Payload struct { + Current Resource `json:"current"` + Previous Resource `json:"previous"` + } +} + +type ResourceDeletedEvent struct { + BaseEvent + Payload Resource `json:"payload"` +} From ebafc11cf6d6b06b7c9d1db372207fd42d67c106 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 5 Aug 2025 14:37:06 -0700 Subject: [PATCH 2/3] more change --- apps/workspace-engine/pkg/events/common.go | 19 +++++++ apps/workspace-engine/pkg/events/events.go | 40 ++++++++++++++- apps/workspace-engine/pkg/events/resource.go | 52 +++++++++++++++++++- apps/workspace-engine/pkg/kafka/consumer.go | 24 +++++---- 4 files changed, 120 insertions(+), 15 deletions(-) create mode 100644 apps/workspace-engine/pkg/events/common.go diff --git a/apps/workspace-engine/pkg/events/common.go b/apps/workspace-engine/pkg/events/common.go new file mode 100644 index 000000000..8a227ebae --- /dev/null +++ b/apps/workspace-engine/pkg/events/common.go @@ -0,0 +1,19 @@ +package events + +import ( + "encoding/json" + "fmt" +) + +func parsePayload(payload any, target any) error { + payloadBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + if err := json.Unmarshal(payloadBytes, target); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } + + return nil +} diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go index 2ebc65d76..cdf49a8ab 100644 --- a/apps/workspace-engine/pkg/events/events.go +++ b/apps/workspace-engine/pkg/events/events.go @@ -1,10 +1,20 @@ package events +import ( + "context" + "fmt" + + "workspace-engine/pkg/logger" + + "github.com/charmbracelet/log" +) + type EventType string const ( ResourceCreated EventType = "resource.created" ResourceUpdated EventType = "resource.updated" + ResourceDeleted EventType = "resource.deleted" ) type BaseEvent struct { @@ -17,5 +27,33 @@ type BaseEvent struct { type RawEvent struct { BaseEvent - Payload map[string]any `json:"payload,omitempty"` + Payload any `json:"payload,omitempty"` +} + +type EventHandler func(ctx context.Context, event RawEvent) error +type EventHandlerRegistry map[EventType]EventHandler + +type EventProcessor struct { + handlers EventHandlerRegistry + log *log.Logger +} + +func NewEventProcessor() *EventProcessor { + return &EventProcessor{ + handlers: EventHandlerRegistry{ + ResourceCreated: handleResourceCreatedEvent, + ResourceUpdated: handleResourceUpdatedEvent, + ResourceDeleted: handleResourceDeletedEvent, + }, + log: logger.Get(), + } +} + +func (p *EventProcessor) HandleEvent(ctx context.Context, event RawEvent) error { + handler, ok := p.handlers[event.EventType] + if !ok { + return fmt.Errorf("no handler found for event type: %s", event.EventType) + } + + return handler(ctx, event) } diff --git a/apps/workspace-engine/pkg/events/resource.go b/apps/workspace-engine/pkg/events/resource.go index b55937aca..ae9272a9a 100644 --- a/apps/workspace-engine/pkg/events/resource.go +++ b/apps/workspace-engine/pkg/events/resource.go @@ -1,6 +1,11 @@ package events -import "time" +import ( + "context" + "fmt" + "time" + "workspace-engine/pkg/logger" +) type Resource struct { ID string `json:"id"` @@ -22,6 +27,21 @@ type ResourceCreatedEvent struct { Payload Resource `json:"payload"` } +func handleResourceCreatedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource created event", "event", event) + + var resourceCreatedEvent ResourceCreatedEvent + if err := parsePayload(event.Payload, &resourceCreatedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource created event", "event", resourceCreatedEvent) + + return nil +} + type ResourceUpdatedEvent struct { BaseEvent Payload struct { @@ -30,7 +50,37 @@ type ResourceUpdatedEvent struct { } } +func handleResourceUpdatedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource updated event", "event", event) + + var resourceUpdatedEvent ResourceUpdatedEvent + if err := parsePayload(event.Payload, &resourceUpdatedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource updated event", "event", resourceUpdatedEvent) + + return nil +} + type ResourceDeletedEvent struct { BaseEvent Payload Resource `json:"payload"` } + +func handleResourceDeletedEvent(_ context.Context, event RawEvent) error { + log := logger.Get() + + log.Info("Handling resource deleted event", "event", event) + + var resourceDeletedEvent ResourceDeletedEvent + if err := parsePayload(event.Payload, &resourceDeletedEvent); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + log.Info("Resource deleted event", "event", resourceDeletedEvent) + + return nil +} diff --git a/apps/workspace-engine/pkg/kafka/consumer.go b/apps/workspace-engine/pkg/kafka/consumer.go index 53193dbdf..dff5023c4 100644 --- a/apps/workspace-engine/pkg/kafka/consumer.go +++ b/apps/workspace-engine/pkg/kafka/consumer.go @@ -6,6 +6,7 @@ import ( "os" "time" + "workspace-engine/pkg/events" "workspace-engine/pkg/logger" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -16,16 +17,6 @@ const ( Topic = "ctrlplane-events" ) -// Event represents the structure of events from your TypeScript code -type Event struct { - WorkspaceID string `json:"workspaceId"` - EventType string `json:"eventType"` - EventID string `json:"eventId"` - Timestamp float64 `json:"timestamp"` - Source string `json:"source"` - Payload map[string]interface{} `json:"payload"` -} - func RunConsumer(ctx context.Context) error { log := logger.Get() @@ -56,6 +47,8 @@ func RunConsumer(ctx context.Context) error { log.Info("Started Kafka consumer for ctrlplane-events") + processor := events.NewEventProcessor() + for { select { case <-ctx.Done(): @@ -74,14 +67,19 @@ func RunConsumer(ctx context.Context) error { continue } - var event Event - err = json.Unmarshal(msg.Value, &event) + var rawEvent events.RawEvent + err = json.Unmarshal(msg.Value, &rawEvent) if err != nil { log.Error("Failed to unmarshal event", "error", err) continue } - log.Info("Received event", "event", event) + log.Info("Received event", "event", rawEvent) + + if err := processor.HandleEvent(ctx, rawEvent); err != nil { + log.Error("Failed to handle event", "error", err) + continue + } // NOTE: we do not commit the message. if the process ends and we need to rebuild the state, // we need to start from the beginning of the topic. From 92665fa6a2be9e40543fb5516c83c7a98c24ef87 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 5 Aug 2025 15:19:04 -0700 Subject: [PATCH 3/3] cleanup --- apps/workspace-engine/pkg/events/events.go | 21 ++++++++++++++------- apps/workspace-engine/pkg/kafka/consumer.go | 16 +++------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go index cdf49a8ab..09dcaa620 100644 --- a/apps/workspace-engine/pkg/events/events.go +++ b/apps/workspace-engine/pkg/events/events.go @@ -2,11 +2,13 @@ package events import ( "context" + "encoding/json" "fmt" "workspace-engine/pkg/logger" "github.com/charmbracelet/log" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type EventType string @@ -33,13 +35,13 @@ type RawEvent struct { type EventHandler func(ctx context.Context, event RawEvent) error type EventHandlerRegistry map[EventType]EventHandler -type EventProcessor struct { +type MessageReader struct { handlers EventHandlerRegistry log *log.Logger } -func NewEventProcessor() *EventProcessor { - return &EventProcessor{ +func NewMessageReader() *MessageReader { + return &MessageReader{ handlers: EventHandlerRegistry{ ResourceCreated: handleResourceCreatedEvent, ResourceUpdated: handleResourceUpdatedEvent, @@ -49,11 +51,16 @@ func NewEventProcessor() *EventProcessor { } } -func (p *EventProcessor) HandleEvent(ctx context.Context, event RawEvent) error { - handler, ok := p.handlers[event.EventType] +func (p *MessageReader) ReadMessage(ctx context.Context, msg *kafka.Message) error { + var rawEvent RawEvent + if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { + return fmt.Errorf("failed to unmarshal event: %w", err) + } + + handler, ok := p.handlers[rawEvent.EventType] if !ok { - return fmt.Errorf("no handler found for event type: %s", event.EventType) + return fmt.Errorf("no handler found for event type: %s", rawEvent.EventType) } - return handler(ctx, event) + return handler(ctx, rawEvent) } diff --git a/apps/workspace-engine/pkg/kafka/consumer.go b/apps/workspace-engine/pkg/kafka/consumer.go index dff5023c4..5195da8b9 100644 --- a/apps/workspace-engine/pkg/kafka/consumer.go +++ b/apps/workspace-engine/pkg/kafka/consumer.go @@ -2,7 +2,6 @@ package kafka import ( "context" - "encoding/json" "os" "time" @@ -47,7 +46,7 @@ func RunConsumer(ctx context.Context) error { log.Info("Started Kafka consumer for ctrlplane-events") - processor := events.NewEventProcessor() + reader := events.NewMessageReader() for { select { @@ -67,17 +66,8 @@ func RunConsumer(ctx context.Context) error { continue } - var rawEvent events.RawEvent - err = json.Unmarshal(msg.Value, &rawEvent) - if err != nil { - log.Error("Failed to unmarshal event", "error", err) - continue - } - - log.Info("Received event", "event", rawEvent) - - if err := processor.HandleEvent(ctx, rawEvent); err != nil { - log.Error("Failed to handle event", "error", err) + if err := reader.ReadMessage(ctx, msg); err != nil { + log.Error("Failed to read message", "error", err) continue }