Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/workspace-engine/pkg/events/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package events

import (
"encoding/json"
"fmt"
)

func parsePayload(payload any, target any) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}

if err := json.Unmarshal(payloadBytes, target); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}

return nil
}
66 changes: 66 additions & 0 deletions apps/workspace-engine/pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package events

import (
"context"
"encoding/json"
"fmt"

"workspace-engine/pkg/logger"

"github.com/charmbracelet/log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type EventType string

const (
ResourceCreated EventType = "resource.created"
ResourceUpdated EventType = "resource.updated"
ResourceDeleted EventType = "resource.deleted"
)

type BaseEvent struct {
EventType EventType `json:"eventType"`
WorkspaceID string `json:"workspaceId"`
EventID string `json:"eventId"`
Timestamp float64 `json:"timestamp"`
Source string `json:"source"`
}

type RawEvent struct {
BaseEvent
Payload any `json:"payload,omitempty"`
}

type EventHandler func(ctx context.Context, event RawEvent) error
type EventHandlerRegistry map[EventType]EventHandler

type MessageReader struct {
handlers EventHandlerRegistry
log *log.Logger
}

func NewMessageReader() *MessageReader {
return &MessageReader{
handlers: EventHandlerRegistry{
ResourceCreated: handleResourceCreatedEvent,
ResourceUpdated: handleResourceUpdatedEvent,
ResourceDeleted: handleResourceDeletedEvent,
},
log: logger.Get(),
}
}

func (p *MessageReader) ReadMessage(ctx context.Context, msg *kafka.Message) error {
var rawEvent RawEvent
if err := json.Unmarshal(msg.Value, &rawEvent); err != nil {
return fmt.Errorf("failed to unmarshal event: %w", err)
}

handler, ok := p.handlers[rawEvent.EventType]
if !ok {
return fmt.Errorf("no handler found for event type: %s", rawEvent.EventType)
}

return handler(ctx, rawEvent)
}
86 changes: 86 additions & 0 deletions apps/workspace-engine/pkg/events/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package events

import (
"context"
"fmt"
"time"
"workspace-engine/pkg/logger"
)

type Resource struct {
ID string `json:"id"`
WorkspaceID string `json:"workspaceId"`
Name string `json:"name"`
Identifier string `json:"identifier"`
Kind string `json:"kind"`
Version string `json:"version"`
Config map[string]interface{} `json:"config"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
ProviderID *string `json:"providerId,omitempty"`
}

type ResourceCreatedEvent struct {
BaseEvent
Payload Resource `json:"payload"`
}

func handleResourceCreatedEvent(_ context.Context, event RawEvent) error {
log := logger.Get()

log.Info("Handling resource created event", "event", event)

var resourceCreatedEvent ResourceCreatedEvent
if err := parsePayload(event.Payload, &resourceCreatedEvent); err != nil {
return fmt.Errorf("failed to parse payload: %w", err)
}

log.Info("Resource created event", "event", resourceCreatedEvent)

return nil
}

type ResourceUpdatedEvent struct {
BaseEvent
Payload struct {
Current Resource `json:"current"`
Previous Resource `json:"previous"`
}
}

func handleResourceUpdatedEvent(_ context.Context, event RawEvent) error {
log := logger.Get()

log.Info("Handling resource updated event", "event", event)

var resourceUpdatedEvent ResourceUpdatedEvent
if err := parsePayload(event.Payload, &resourceUpdatedEvent); err != nil {
return fmt.Errorf("failed to parse payload: %w", err)
}

log.Info("Resource updated event", "event", resourceUpdatedEvent)

return nil
}

type ResourceDeletedEvent struct {
BaseEvent
Payload Resource `json:"payload"`
}

func handleResourceDeletedEvent(_ context.Context, event RawEvent) error {
log := logger.Get()

log.Info("Handling resource deleted event", "event", event)

var resourceDeletedEvent ResourceDeletedEvent
if err := parsePayload(event.Payload, &resourceDeletedEvent); err != nil {
return fmt.Errorf("failed to parse payload: %w", err)
}

log.Info("Resource deleted event", "event", resourceDeletedEvent)

return nil
}
22 changes: 5 additions & 17 deletions apps/workspace-engine/pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package kafka

import (
"context"
"encoding/json"
"os"
"time"

"workspace-engine/pkg/events"
"workspace-engine/pkg/logger"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -16,16 +16,6 @@ const (
Topic = "ctrlplane-events"
)

// Event represents the structure of events from your TypeScript code
type Event struct {
WorkspaceID string `json:"workspaceId"`
EventType string `json:"eventType"`
EventID string `json:"eventId"`
Timestamp float64 `json:"timestamp"`
Source string `json:"source"`
Payload map[string]interface{} `json:"payload"`
}

func RunConsumer(ctx context.Context) error {
log := logger.Get()

Expand Down Expand Up @@ -56,6 +46,8 @@ func RunConsumer(ctx context.Context) error {

log.Info("Started Kafka consumer for ctrlplane-events")

reader := events.NewMessageReader()

for {
select {
case <-ctx.Done():
Expand All @@ -74,15 +66,11 @@ func RunConsumer(ctx context.Context) error {
continue
}

var event Event
err = json.Unmarshal(msg.Value, &event)
if err != nil {
log.Error("Failed to unmarshal event", "error", err)
if err := reader.ReadMessage(ctx, msg); err != nil {
log.Error("Failed to read message", "error", err)
continue
}

log.Info("Received event", "event", event)

// NOTE: we do not commit the message. if the process ends and we need to rebuild the state,
// we need to start from the beginning of the topic.
}
Expand Down