Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions database/sql/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,50 @@ func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, er
// purposes. So they are safe to delete.
// Also deletes completed jobs with GARM runners attached as they are no longer needed.
func (s *sqlDatabase) DeleteInactionableJobs(_ context.Context, olderThan time.Duration) error {
q := s.conn.
Unscoped().
Where("(status != ? AND instance_id IS NULL) OR (status = ? AND instance_id IS NOT NULL)", params.JobStatusQueued, params.JobStatusCompleted)
if olderThan > 0 {
q = q.Where("created_at < ?", time.Now().Add(-olderThan))
// Fetch and delete within a transaction to avoid races.
var jobs []WorkflowJob

err := s.conn.Transaction(func(tx *gorm.DB) error {
q := tx.
Model(&WorkflowJob{}).
Preload("Instance").
Where("(status != ? AND instance_id IS NULL) OR (status = ? AND instance_id IS NOT NULL)", params.JobStatusQueued, params.JobStatusCompleted)
if olderThan > 0 {
q = q.Where("created_at < ?", time.Now().Add(-olderThan))
}
if err := q.Find(&jobs).Error; err != nil {
return fmt.Errorf("fetching inactionable jobs: %w", err)
}

if len(jobs) == 0 {
return nil
}

ids := make([]int64, len(jobs))
for i, j := range jobs {
ids[i] = j.ID
}

if err := tx.Unscoped().Where("id IN ?", ids).Delete(&WorkflowJob{}).Error; err != nil {
return fmt.Errorf("deleting inactionable jobs: %w", err)
}

return nil
})
if err != nil {
return err
}
q = q.Delete(&WorkflowJob{})
if q.Error != nil {
return fmt.Errorf("deleting inactionable jobs: %w", q.Error)

for _, j := range jobs {
asParams, err := sqlWorkflowJobToParamsJob(j)
if err != nil {
slog.With(slog.Any("error", err)).Error("failed to convert job for notify")
continue
}
if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, asParams); notifyErr != nil {
slog.With(slog.Any("error", notifyErr)).Error("failed to send delete notify for job")
}
}

return nil
}
86 changes: 86 additions & 0 deletions database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"regexp"
"strings"
"time"

"gorm.io/driver/mysql"
"gorm.io/driver/sqlite"
Expand Down Expand Up @@ -124,6 +125,11 @@ func NewSQLDatabase(ctx context.Context, cfg config.Database) (common.Store, err
if err := db.migrateDB(); err != nil {
return nil, fmt.Errorf("error migrating database: %w", err)
}

if cfg.DbBackend == config.SQLiteBackend {
go db.startSQLiteMaintenance()
}

return db, nil
}

Expand All @@ -140,6 +146,35 @@ type sqlDatabase struct {
producer common.Producer
}

// startSQLiteMaintenance runs periodic WAL checkpoint and VACUUM on both the
// main database and the objects database (if present). This reclaims disk space
// from deleted rows/blobs and keeps the WAL file from growing unbounded.
func (s *sqlDatabase) startSQLiteMaintenance() {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runSQLiteMaintenance(s.conn, "main")
if s.objectsConn != nil {
s.runSQLiteMaintenance(s.objectsConn, "objects")
}
}
}
}

func (s *sqlDatabase) runSQLiteMaintenance(conn *gorm.DB, dbName string) {
if err := conn.Exec("PRAGMA wal_checkpoint(TRUNCATE)").Error; err != nil {
slog.With(slog.Any("error", err)).ErrorContext(s.ctx, "failed to checkpoint WAL", "database", dbName)
}
if err := conn.Exec("PRAGMA incremental_vacuum").Error; err != nil {
slog.With(slog.Any("error", err)).ErrorContext(s.ctx, "failed to incremental vacuum database", "database", dbName)
}
}

var renameTemplate = `
PRAGMA foreign_keys = OFF;
BEGIN TRANSACTION;
Expand Down Expand Up @@ -450,6 +485,52 @@ func (s *sqlDatabase) migrateFileObjects() error {
return nil
}

// migrateAutoVacuum converts SQLite databases from auto_vacuum=NONE to
// auto_vacuum=INCREMENTAL. This is a one-time migration that allows the
// periodic maintenance goroutine to use PRAGMA incremental_vacuum to
// efficiently reclaim free pages without rebuilding the entire database.
func (s *sqlDatabase) migrateAutoVacuum() error {
if s.cfg.DbBackend != config.SQLiteBackend {
return nil
}

if err := s.enableIncrementalVacuum(s.conn, "main"); err != nil {
return err
}

if s.objectsConn != nil {
if err := s.enableIncrementalVacuum(s.objectsConn, "objects"); err != nil {
return err
}
}

return nil
}

func (s *sqlDatabase) enableIncrementalVacuum(conn *gorm.DB, dbName string) error {
var autoVacuum int
if err := conn.Raw("PRAGMA auto_vacuum").Scan(&autoVacuum).Error; err != nil {
return fmt.Errorf("failed to read auto_vacuum for %s db: %w", dbName, err)
}

// 2 = INCREMENTAL, already set
if autoVacuum == 2 {
return nil
}

slog.InfoContext(s.ctx, "migrating database to incremental auto_vacuum", "database", dbName)

if err := conn.Exec("PRAGMA auto_vacuum = INCREMENTAL").Error; err != nil {
return fmt.Errorf("failed to set auto_vacuum for %s db: %w", dbName, err)
}
// VACUUM is required to apply the auto_vacuum mode change.
if err := conn.Exec("VACUUM").Error; err != nil {
return fmt.Errorf("failed to vacuum %s db for auto_vacuum migration: %w", dbName, err)
}

return nil
}

func (s *sqlDatabase) ensureTemplates(migrateTemplates bool) error {
if !migrateTemplates {
return nil
Expand Down Expand Up @@ -772,6 +853,11 @@ func (s *sqlDatabase) migrateDB() error {
return fmt.Errorf("error migrating file objects: %w", err)
}

// Migrate auto_vacuum mode to incremental for both databases.
if err := s.migrateAutoVacuum(); err != nil {
return fmt.Errorf("error migrating auto_vacuum: %w", err)
}

s.conn.Exec("PRAGMA foreign_keys = ON")

// Migrate controller info if needed
Expand Down
Loading