Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/workspace-engine/pkg/db/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package db
import (
"context"
"fmt"
"workspace-engine/pkg/changeset"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/changeset"

"github.com/jackc/pgx/v5"
)
Expand Down
170 changes: 97 additions & 73 deletions apps/workspace-engine/pkg/db/workspace_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package db
import (
"context"
"fmt"
"workspace-engine/pkg/workspace"
"workspace-engine/pkg/oapi"

"github.com/charmbracelet/log"
"go.opentelemetry.io/otel"
Expand All @@ -12,139 +12,163 @@ import (

var tracer = otel.Tracer("db")

func loadSystems(ctx context.Context, ws *workspace.Workspace) error {
type InitialWorkspaceState struct {
systems []*oapi.System
resources []*oapi.Resource
deployments []*oapi.Deployment
deploymentVersions []*oapi.DeploymentVersion
deploymentVariables []*oapi.DeploymentVariable
environments []*oapi.Environment
policies []*oapi.Policy
jobAgents []*oapi.JobAgent
relationships []*oapi.RelationshipRule
}

func (i *InitialWorkspaceState) Systems() []*oapi.System {
return i.systems
}

func (i *InitialWorkspaceState) Resources() []*oapi.Resource {
return i.resources
}

func (i *InitialWorkspaceState) Deployments() []*oapi.Deployment {
return i.deployments
}

func (i *InitialWorkspaceState) DeploymentVersions() []*oapi.DeploymentVersion {
return i.deploymentVersions
}

func (i *InitialWorkspaceState) DeploymentVariables() []*oapi.DeploymentVariable {
return i.deploymentVariables
}

func (i *InitialWorkspaceState) Environments() []*oapi.Environment {
return i.environments
}

func (i *InitialWorkspaceState) Policies() []*oapi.Policy {
return i.policies
}

func (i *InitialWorkspaceState) JobAgents() []*oapi.JobAgent {
return i.jobAgents
}

func (i *InitialWorkspaceState) Relationships() []*oapi.RelationshipRule {
return i.relationships
}

func loadSystems(ctx context.Context, workspaceID string) ([]*oapi.System, error) {
ctx, span := tracer.Start(ctx, "loadSystems")
defer span.End()

dbSystems, err := getSystems(ctx, ws.ID)
dbSystems, err := getSystems(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get systems: %w", err)
}
for _, system := range dbSystems {
ws.Systems().Upsert(ctx, system)
return nil, fmt.Errorf("failed to get systems: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbSystems)))
return nil
return dbSystems, nil
}

func loadResources(ctx context.Context, ws *workspace.Workspace) error {
func loadResources(ctx context.Context, workspaceID string) ([]*oapi.Resource, error) {
ctx, span := tracer.Start(ctx, "loadResources")
defer span.End()

dbResources, err := getResources(ctx, ws.ID)
dbResources, err := getResources(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get resources: %w", err)
}
for _, resource := range dbResources {
ws.Resources().Upsert(ctx, resource)
return nil, fmt.Errorf("failed to get resources: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbResources)))
return nil
return dbResources, nil
}

func loadDeployments(ctx context.Context, ws *workspace.Workspace) error {
func loadDeployments(ctx context.Context, workspaceID string) ([]*oapi.Deployment, error) {
ctx, span := tracer.Start(ctx, "loadDeployments")
defer span.End()

dbDeployments, err := getDeployments(ctx, ws.ID)
dbDeployments, err := getDeployments(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get deployments: %w", err)
}
for _, deployment := range dbDeployments {
ws.Deployments().Upsert(ctx, deployment)
return nil, fmt.Errorf("failed to get deployments: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbDeployments)))
return nil
return dbDeployments, nil
}

func loadDeploymentVersions(ctx context.Context, ws *workspace.Workspace) error {
func loadDeploymentVersions(ctx context.Context, workspaceID string) ([]*oapi.DeploymentVersion, error) {
ctx, span := tracer.Start(ctx, "loadDeploymentVersions")
defer span.End()

dbDeploymentVersions, err := getDeploymentVersions(ctx, ws.ID)
dbDeploymentVersions, err := getDeploymentVersions(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get deployment versions: %w", err)
}
for _, deploymentVersion := range dbDeploymentVersions {
ws.DeploymentVersions().Upsert(ctx, deploymentVersion.DeploymentId, deploymentVersion)
return nil, fmt.Errorf("failed to get deployment versions: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbDeploymentVersions)))
return nil
return dbDeploymentVersions, nil
}

func loadDeploymentVariables(ctx context.Context, ws *workspace.Workspace) error {
func loadDeploymentVariables(ctx context.Context, workspaceID string) ([]*oapi.DeploymentVariable, error) {
ctx, span := tracer.Start(ctx, "loadDeploymentVariables")
defer span.End()

dbDeploymentVariables, err := getDeploymentVariables(ctx, ws.ID)
dbDeploymentVariables, err := getDeploymentVariables(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get deployment variables: %w", err)
}
for _, deploymentVariable := range dbDeploymentVariables {
ws.Deployments().Variables(deploymentVariable.DeploymentId)[deploymentVariable.Key] = deploymentVariable
return nil, fmt.Errorf("failed to get deployment variables: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbDeploymentVariables)))
return nil
return dbDeploymentVariables, nil
}

func loadEnvironments(ctx context.Context, ws *workspace.Workspace) error {
func loadEnvironments(ctx context.Context, workspaceID string) ([]*oapi.Environment, error) {
ctx, span := tracer.Start(ctx, "loadEnvironments")
defer span.End()

dbEnvironments, err := getEnvironments(ctx, ws.ID)
dbEnvironments, err := getEnvironments(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get environments: %w", err)
}
for _, environment := range dbEnvironments {
ws.Environments().Upsert(ctx, environment)
return nil, fmt.Errorf("failed to get environments: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbEnvironments)))
return nil
return dbEnvironments, nil
}

func loadPolicies(ctx context.Context, ws *workspace.Workspace) error {
func loadPolicies(ctx context.Context, workspaceID string) ([]*oapi.Policy, error) {
ctx, span := tracer.Start(ctx, "loadPolicies")
defer span.End()

dbPolicies, err := getPolicies(ctx, ws.ID)
dbPolicies, err := getPolicies(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get policies: %w", err)
}
for _, policy := range dbPolicies {
ws.Policies().Upsert(ctx, policy)
return nil, fmt.Errorf("failed to get policies: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbPolicies)))
return nil
return dbPolicies, nil
}

func loadJobAgents(ctx context.Context, ws *workspace.Workspace) error {
func loadJobAgents(ctx context.Context, workspaceID string) ([]*oapi.JobAgent, error) {
ctx, span := tracer.Start(ctx, "loadJobAgents")
defer span.End()

dbJobAgents, err := getJobAgents(ctx, ws.ID)
dbJobAgents, err := getJobAgents(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get job agents: %w", err)
}
for _, jobAgent := range dbJobAgents {
ws.JobAgents().Upsert(ctx, jobAgent)
return nil, fmt.Errorf("failed to get job agents: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbJobAgents)))
return nil
return dbJobAgents, nil
}

func loadRelationships(ctx context.Context, ws *workspace.Workspace) error {
func loadRelationships(ctx context.Context, workspaceID string) ([]*oapi.RelationshipRule, error) {
ctx, span := tracer.Start(ctx, "loadRelationships")
defer span.End()

dbRelationships, err := getRelationships(ctx, ws.ID)
dbRelationships, err := getRelationships(ctx, workspaceID)
if err != nil {
return fmt.Errorf("failed to get relationships: %w", err)
return nil, fmt.Errorf("failed to get relationships: %w", err)
}
span.SetAttributes(attribute.Int("count", len(dbRelationships)))
return nil
return dbRelationships, nil
}

func LoadWorkspace(ctx context.Context, workspaceID string) (*workspace.Workspace, error) {
func LoadWorkspace(ctx context.Context, workspaceID string) (initialWorkspaceState *InitialWorkspaceState, err error) {
ctx, span := tracer.Start(ctx, "LoadWorkspace")
defer span.End()

Expand All @@ -155,43 +179,43 @@ func LoadWorkspace(ctx context.Context, workspaceID string) (*workspace.Workspac
}
defer db.Release()

ws := workspace.New(workspaceID)
initialWorkspaceState = &InitialWorkspaceState{}

if err := loadSystems(ctx, ws); err != nil {
if initialWorkspaceState.systems, err = loadSystems(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load systems: %w", err)
}

if err := loadResources(ctx, ws); err != nil {
if initialWorkspaceState.resources, err = loadResources(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load resources: %w", err)
}

if err := loadDeployments(ctx, ws); err != nil {
if initialWorkspaceState.deployments, err = loadDeployments(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load deployments: %w", err)
}

if err := loadDeploymentVersions(ctx, ws); err != nil {
if initialWorkspaceState.deploymentVersions, err = loadDeploymentVersions(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load deployment versions: %w", err)
}

if err := loadDeploymentVariables(ctx, ws); err != nil {
if initialWorkspaceState.deploymentVariables, err = loadDeploymentVariables(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load deployment variables: %w", err)
}

if err := loadEnvironments(ctx, ws); err != nil {
if initialWorkspaceState.environments, err = loadEnvironments(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load environments: %w", err)
}

if err := loadPolicies(ctx, ws); err != nil {
if initialWorkspaceState.policies, err = loadPolicies(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load policies: %w", err)
}

if err := loadJobAgents(ctx, ws); err != nil {
if initialWorkspaceState.jobAgents, err = loadJobAgents(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load job agents: %w", err)
}

if err := loadRelationships(ctx, ws); err != nil {
if initialWorkspaceState.relationships, err = loadRelationships(ctx, workspaceID); err != nil {
return nil, fmt.Errorf("failed to load relationships: %w", err)
}

return ws, nil
return initialWorkspaceState, nil
}
10 changes: 4 additions & 6 deletions apps/workspace-engine/pkg/events/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"workspace-engine/pkg/changeset"
"workspace-engine/pkg/db"
"workspace-engine/pkg/workspace"
"workspace-engine/pkg/workspace/changeset"

"github.com/charmbracelet/log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand Down Expand Up @@ -141,18 +141,16 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message)
if wsExists {
ws = workspace.GetWorkspace(rawEvent.WorkspaceID)
}

changeSet := changeset.NewChangeSet()
if !wsExists {
fullWs, err := db.LoadWorkspace(ctx, rawEvent.WorkspaceID)
if err != nil {
ws = workspace.New(rawEvent.WorkspaceID)
if err := loadWorkspaceWithInitialState(ctx, ws); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to load workspace")
log.Error("Failed to load workspace", "error", err, "workspaceID", rawEvent.WorkspaceID)
return fmt.Errorf("failed to load workspace: %w", err)
}
workspace.Set(rawEvent.WorkspaceID, fullWs)
ws = fullWs
workspace.Set(rawEvent.WorkspaceID, ws)
changeSet.IsInitialLoad = true
}
ctx = changeset.WithChangeSet(ctx, changeSet)
Expand Down
56 changes: 56 additions & 0 deletions apps/workspace-engine/pkg/events/handler/workspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package handler

import (
"context"
"workspace-engine/pkg/db"
"workspace-engine/pkg/workspace"
)

func loadWorkspaceWithInitialState(ctx context.Context, ws *workspace.Workspace) error {
initialWorkspaceState, err := db.LoadWorkspace(ctx, ws.ID)
if err != nil {
return err
}

for _, system := range initialWorkspaceState.Systems() {
if err := ws.Store().Systems.Upsert(ctx, system); err != nil {
return err
}
}
for _, resource := range initialWorkspaceState.Resources() {
if _, err := ws.Store().Resources.Upsert(ctx, resource); err != nil {
return err
}
}
for _, deployment := range initialWorkspaceState.Deployments() {
if err := ws.Store().Deployments.Upsert(ctx, deployment); err != nil {
return err
}
}
for _, deploymentVersion := range initialWorkspaceState.DeploymentVersions() {
ws.Store().DeploymentVersions.Upsert(ctx, deploymentVersion.Id, deploymentVersion)
}
for _, deploymentVariable := range initialWorkspaceState.DeploymentVariables() {
ws.Store().DeploymentVariables.Upsert(ctx, deploymentVariable.Id, deploymentVariable)
}
for _, environment := range initialWorkspaceState.Environments() {
if err := ws.Store().Environments.Upsert(ctx, environment); err != nil {
return err
}
}
for _, policy := range initialWorkspaceState.Policies() {
if err := ws.Store().Policies.Upsert(ctx, policy); err != nil {
return err
}
}
for _, jobAgent := range initialWorkspaceState.JobAgents() {
ws.Store().JobAgents.Upsert(ctx, jobAgent)
}
for _, relationship := range initialWorkspaceState.Relationships() {
if err := ws.Store().Relationships.Upsert(ctx, relationship); err != nil {
return err
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package store

import (
"context"
"workspace-engine/pkg/changeset"
"workspace-engine/pkg/cmap"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/changeset"
"workspace-engine/pkg/workspace/store/repository"
)

Expand Down
Loading
Loading