DEV Community

Cover image for Go Database Migration Performance: Optimizing for Production Environments
Aarav Joshi
Aarav Joshi

Posted on

Go Database Migration Performance: Optimizing for Production Environments

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Database migrations represent one of the most critical operations in production environments. I've spent years working with large-scale systems where a poorly planned migration could lead to hours of downtime or even data corruption. In this article, I'll share practical strategies for optimizing database migration performance in Golang applications.

Understanding Database Migration Challenges

Database migrations become particularly challenging as applications scale. A migration that works smoothly in development often causes significant issues in production. The primary challenges include:

  • Locking tables during schema changes
  • Excessive memory consumption during large data operations
  • Timeouts and connection issues
  • Performance degradation affecting user experience
  • Recovery from failed migrations

In Go applications, these challenges are compounded by the need to balance concurrent operations with database resource constraints.

Strategies for High-Performance Migrations

For effective database migrations in Go, I focus on several core strategies:

Chunked Migrations

Breaking large operations into smaller chunks is essential. Instead of updating millions of rows at once, process them in batches:

func migrateInChunks(db *sql.DB, tableName string, batchSize int) error {
    var maxID int
    err := db.QueryRow("SELECT MAX(id) FROM " + tableName).Scan(&maxID)
    if err != nil {
        return err
    }

    for i := 0; i <= maxID; i += batchSize {
        _, err = db.Exec(fmt.Sprintf(
            "UPDATE %s SET new_column = computed_value WHERE id >= %d AND id < %d", 
            tableName, i, i+batchSize))
        if err != nil {
            return err
        }

        // Allow other operations to proceed
        time.Sleep(10 * time.Millisecond)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This approach prevents locking the entire table and reduces memory consumption.

Parallel Processing with Worker Pools

For independent operations, worker pools significantly speed up migrations:

func parallelMigration(db *sql.DB, chunks int, workerCount int) error {
    jobs := make(chan int, chunks)
    results := make(chan error, chunks)

    // Start workers
    for w := 1; w <= workerCount; w++ {
        go worker(db, jobs, results)
    }

    // Send jobs to workers
    for j := 1; j <= chunks; j++ {
        jobs <- j
    }
    close(jobs)

    // Collect results
    for j := 1; j <= chunks; j++ {
        if err := <-results; err != nil {
            return err
        }
    }

    return nil
}

func worker(db *sql.DB, jobs <-chan int, results chan<- error) {
    for chunkID := range jobs {
        // Process chunk
        err := processChunk(db, chunkID)
        results <- err
    }
}

func processChunk(db *sql.DB, chunkID int) error {
    // Calculate chunk bounds
    startID := (chunkID - 1) * 1000
    endID := chunkID * 1000

    _, err := db.Exec("UPDATE large_table SET processed = true WHERE id >= ? AND id < ?", 
                     startID, endID)
    return err
}
Enter fullscreen mode Exit fullscreen mode

This pattern allows you to control concurrency based on database capacity.

Connection Pool Management

Proper connection pool configuration is critical for migration performance:

func configureConnectionPool(db *sql.DB) {
    // Set maximum number of open connections
    db.SetMaxOpenConns(25)

    // Set maximum number of idle connections
    db.SetMaxIdleConns(10)

    // Set maximum lifetime of a connection
    db.SetConnMaxLifetime(15 * time.Minute)

    // Verify connection pool is functioning
    if err := db.Ping(); err != nil {
        log.Fatalf("Could not connect to database: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

These settings should be adjusted based on your database's capacity and the migration's requirements.

Transaction Management

Transactions improve both performance and data integrity:

func migrateWithTransactions(db *sql.DB, tableName string, batchSize int) error {
    var maxID int
    err := db.QueryRow("SELECT MAX(id) FROM " + tableName).Scan(&maxID)
    if err != nil {
        return err
    }

    for i := 0; i <= maxID; i += batchSize {
        tx, err := db.Begin()
        if err != nil {
            return err
        }

        _, err = tx.Exec(fmt.Sprintf(
            "UPDATE %s SET new_column = computed_value WHERE id >= %d AND id < %d", 
            tableName, i, i+batchSize))

        if err != nil {
            tx.Rollback()
            return err
        }

        if err = tx.Commit(); err != nil {
            return err
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

For very large batches, consider setting appropriate transaction isolation levels.

Monitoring and Adaptive Rate Limiting

Adding monitoring helps adjust migration speed based on system load:

func adaptiveMigration(db *sql.DB, table string) error {
    batchSize := 1000
    var processed int

    for {
        before := time.Now()

        tx, err := db.Begin()
        if err != nil {
            return err
        }

        // Process a batch
        result, err := tx.Exec(fmt.Sprintf(
            "UPDATE %s SET processed = true WHERE processed = false LIMIT %d", 
            table, batchSize))

        if err != nil {
            tx.Rollback()
            return err
        }

        affected, _ := result.RowsAffected()
        if affected == 0 {
            // No more rows to process
            tx.Rollback()
            break
        }

        if err = tx.Commit(); err != nil {
            return err
        }

        processed += int(affected)

        // Calculate duration and adjust batch size
        duration := time.Since(before)
        if duration > 200*time.Millisecond {
            // Too slow, reduce batch size
            batchSize = int(float64(batchSize) * 0.8)
            if batchSize < 100 {
                batchSize = 100
            }
        } else if duration < 50*time.Millisecond {
            // Too fast, increase batch size
            batchSize = int(float64(batchSize) * 1.2)
            if batchSize > 10000 {
                batchSize = 10000
            }
        }

        fmt.Printf("Processed %d rows in %v (batch size: %d)\n", 
                  processed, duration, batchSize)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This approach automatically adjusts processing speed based on the database's response times.

Advanced Techniques for Complex Migrations

For more complex schema changes, I've found several advanced techniques to be effective:

Shadow Tables for Zero-Downtime Migrations

When adding indexes or changing column types, shadow tables prevent locking:

func migrateThroughShadowTable(db *sql.DB, originalTable string) error {
    // Create shadow table with new schema
    if _, err := db.Exec(fmt.Sprintf("CREATE TABLE %s_new LIKE %s", 
                                   originalTable, originalTable)); err != nil {
        return err
    }

    // Add new columns/indexes to shadow table
    if _, err := db.Exec(fmt.Sprintf("ALTER TABLE %s_new ADD COLUMN new_column TEXT", 
                                   originalTable)); err != nil {
        return err
    }

    // Copy data in batches
    var maxID int
    if err := db.QueryRow(fmt.Sprintf("SELECT MAX(id) FROM %s", originalTable)).Scan(&maxID); err != nil {
        return err
    }

    batchSize := 5000
    for i := 0; i <= maxID; i += batchSize {
        if _, err := db.Exec(fmt.Sprintf(
            "INSERT INTO %s_new SELECT *, COALESCE(some_field, '') FROM %s WHERE id >= %d AND id < %d",
            originalTable, originalTable, i, i+batchSize)); err != nil {
            return err
        }
    }

    // Atomic table swap
    tx, err := db.Begin()
    if err != nil {
        return err
    }

    _, err = tx.Exec(fmt.Sprintf("RENAME TABLE %s TO %s_old, %s_new TO %s", 
                               originalTable, originalTable, originalTable, originalTable))
    if err != nil {
        tx.Rollback()
        return err
    }

    return tx.Commit()
}
Enter fullscreen mode Exit fullscreen mode

This technique is particularly valuable for MySQL, where ALTER TABLE operations can lock tables.

Resumable Migrations with State Tracking

For very large migrations, implement tracking to resume after failures:

func resumableMigration(db *sql.DB) error {
    // Create migration tracking table if it doesn't exist
    if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS migration_progress (
        migration_id VARCHAR(50) PRIMARY KEY,
        last_processed_id INT,
        status VARCHAR(20),
        started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        last_updated TIMESTAMP
    )`); err != nil {
        return err
    }

    migrationID := "add_full_text_search_2023_07_21"
    var lastProcessedID int
    var status string

    // Check if migration exists and get progress
    err := db.QueryRow("SELECT last_processed_id, status FROM migration_progress WHERE migration_id = ?", 
                      migrationID).Scan(&lastProcessedID, &status)

    if err != nil && err != sql.ErrNoRows {
        return err
    }

    if err == sql.ErrNoRows {
        // First run of this migration
        _, err = db.Exec("INSERT INTO migration_progress (migration_id, last_processed_id, status) VALUES (?, 0, 'running')",
                       migrationID)
        if err != nil {
            return err
        }
        lastProcessedID = 0
    } else if status == "completed" {
        // Migration already completed
        fmt.Printf("Migration %s already completed\n", migrationID)
        return nil
    } else {
        // Resume migration
        fmt.Printf("Resuming migration %s from ID %d\n", migrationID, lastProcessedID)
    }

    batchSize := 1000
    for {
        tx, err := db.Begin()
        if err != nil {
            return err
        }

        // Process next batch
        result, err := tx.Exec(`
            UPDATE large_table SET search_vector = to_tsvector(content)
            WHERE id > ? ORDER BY id LIMIT ?`, 
            lastProcessedID, batchSize)

        if err != nil {
            tx.Rollback()
            return err
        }

        affected, _ := result.RowsAffected()
        if affected == 0 {
            // No more rows to process
            tx.Commit()
            break
        }

        // Get highest ID processed in this batch
        var newLastID int
        err = tx.QueryRow(`
            SELECT MAX(id) FROM large_table 
            WHERE id > ? ORDER BY id LIMIT ?`, 
            lastProcessedID, batchSize).Scan(&newLastID)

        if err != nil {
            tx.Rollback()
            return err
        }

        // Update progress
        _, err = tx.Exec(`
            UPDATE migration_progress 
            SET last_processed_id = ?, last_updated = CURRENT_TIMESTAMP
            WHERE migration_id = ?`, 
            newLastID, migrationID)

        if err != nil {
            tx.Rollback()
            return err
        }

        if err = tx.Commit(); err != nil {
            return err
        }

        lastProcessedID = newLastID
        fmt.Printf("Processed up to ID %d\n", lastProcessedID)
    }

    // Mark migration as completed
    _, err = db.Exec(`
        UPDATE migration_progress 
        SET status = 'completed', last_updated = CURRENT_TIMESTAMP
        WHERE migration_id = ?`, 
        migrationID)

    return err
}
Enter fullscreen mode Exit fullscreen mode

This pattern ensures that long-running migrations can survive application restarts or crashes.

Temporary Column Strategy

For complex transformations, use temporary columns to minimize locking:

func safeColumnTransformation(db *sql.DB, table, column string) error {
    // Add temporary column
    if _, err := db.Exec(fmt.Sprintf(
        "ALTER TABLE %s ADD COLUMN %s_new VARCHAR(255)", 
        table, column)); err != nil {
        return err
    }

    // Update in batches
    if err := updateInBatches(db, table, column); err != nil {
        return err
    }

    // Atomic column swap
    tx, err := db.Begin()
    if err != nil {
        return err
    }

    queries := []string{
        fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", table, column),
        fmt.Sprintf("ALTER TABLE %s RENAME COLUMN %s_new TO %s", table, column, column),
    }

    for _, query := range queries {
        if _, err := tx.Exec(query); err != nil {
            tx.Rollback()
            return err
        }
    }

    return tx.Commit()
}

func updateInBatches(db *sql.DB, table, column string) error {
    var count int
    err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count)
    if err != nil {
        return err
    }

    batchSize := 5000
    for offset := 0; offset < count; offset += batchSize {
        if _, err := db.Exec(fmt.Sprintf(
            "UPDATE %s SET %s_new = UPPER(%s) LIMIT %d OFFSET %d",
            table, column, column, batchSize, offset)); err != nil {
            return err
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This approach is particularly useful for column type changes or transformations.

Best Practices for Production Migrations

Beyond the code, I've learned several best practices for executing migrations safely:

Pre-Migration Planning

Before running any migration in production:

  1. Calculate the approximate time required by testing against a production-like dataset
  2. Schedule migrations during low-traffic periods
  3. Create benchmarks to measure database load during different phases
  4. Implement dry-run options for validation

Monitoring During Migrations

During migration execution:

func monitoredMigration(db *sql.DB, migrationFunc func() error) error {
    startTime := time.Now()
    totalRows := countRows(db)
    ticker := time.NewTicker(5 * time.Second)
    done := make(chan bool)

    // Start progress reporter
    go func() {
        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                currentRows := countProcessedRows(db)
                elapsed := time.Since(startTime)
                percent := float64(currentRows) / float64(totalRows) * 100
                estimatedTotal := float64(elapsed) / (float64(currentRows) / float64(totalRows))
                remaining := estimatedTotal - float64(elapsed)

                fmt.Printf("Progress: %.2f%% (%d/%d rows, %v elapsed, ~%v remaining)\n",
                          percent, currentRows, totalRows, elapsed.Round(time.Second), 
                          time.Duration(remaining).Round(time.Second))
            }
        }
    }()

    // Run the migration
    err := migrationFunc()

    // Stop the progress reporter
    ticker.Stop()
    done <- true

    if err != nil {
        fmt.Printf("Migration failed after %v: %v\n", time.Since(startTime), err)
    } else {
        fmt.Printf("Migration completed successfully in %v\n", time.Since(startTime))
    }

    return err
}
Enter fullscreen mode Exit fullscreen mode

This provides visibility into migration progress and helps identify performance issues early.

Post-Migration Verification

After migration completion, verify data integrity:

func verifyMigration(db *sql.DB) error {
    // Check row counts match
    var originalCount, newCount int
    if err := db.QueryRow("SELECT COUNT(*) FROM original_table").Scan(&originalCount); err != nil {
        return err
    }
    if err := db.QueryRow("SELECT COUNT(*) FROM migrated_table").Scan(&newCount); err != nil {
        return err
    }

    if originalCount != newCount {
        return fmt.Errorf("row count mismatch: %d vs %d", originalCount, newCount)
    }

    // Sample data verification
    rows, err := db.Query(`
        SELECT original.id, original.value, migrated.value 
        FROM original_table original
        JOIN migrated_table migrated ON original.id = migrated.id
        ORDER BY RAND() LIMIT 100
    `)
    if err != nil {
        return err
    }
    defer rows.Close()

    var mismatchCount int
    for rows.Next() {
        var id int
        var originalValue, migratedValue string
        if err := rows.Scan(&id, &originalValue, &migratedValue); err != nil {
            return err
        }

        if originalValue != migratedValue {
            mismatchCount++
            fmt.Printf("Mismatch for ID %d: %s vs %s\n", id, originalValue, migratedValue)
        }
    }

    if mismatchCount > 0 {
        return fmt.Errorf("found %d data mismatches in sample", mismatchCount)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This validates that data was correctly migrated and helps catch any issues quickly.

A Complete Migration Framework

Combining these techniques, I've developed a comprehensive framework for database migrations in Go:

package migration

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"
)

type MigrationStep struct {
    Name        string
    Description string
    Execute     func(ctx context.Context, tx *sql.Tx) error
}

type ChunkedMigration struct {
    Name        string
    Description string
    ChunkCount  int
    ChunkFn     func(ctx context.Context, tx *sql.Tx, chunkID int) error
    Concurrency int
}

type Migrator struct {
    db *sql.DB
}

func NewMigrator(db *sql.DB) *Migrator {
    return &Migrator{db: db}
}

func (m *Migrator) RunStep(step MigrationStep) error {
    log.Printf("Starting migration step: %s", step.Name)
    start := time.Now()

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
    defer cancel()

    tx, err := m.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }

    if err := step.Execute(ctx, tx); err != nil {
        tx.Rollback()
        return fmt.Errorf("migration step failed: %w", err)
    }

    if err := tx.Commit(); err != nil {
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    log.Printf("Completed migration step '%s' in %v", step.Name, time.Since(start))
    return nil
}

func (m *Migrator) RunChunkedMigration(migration ChunkedMigration) error {
    log.Printf("Starting chunked migration: %s", migration.Name)
    start := time.Now()

    // Create context for the entire migration
    ctx, cancel := context.WithTimeout(context.Background(), 24*time.Hour)
    defer cancel()

    // Setup tracking table
    if _, err := m.db.ExecContext(ctx, `
        CREATE TABLE IF NOT EXISTS migration_chunks (
            migration_name VARCHAR(100),
            chunk_id INT,
            status VARCHAR(20),
            started_at TIMESTAMP,
            completed_at TIMESTAMP,
            error TEXT,
            PRIMARY KEY (migration_name, chunk_id)
        )
    `); err != nil {
        return fmt.Errorf("failed to create tracking table: %w", err)
    }

    // Determine chunks to process (skipping already completed ones)
    chunks := make([]int, 0, migration.ChunkCount)
    for i := 0; i < migration.ChunkCount; i++ {
        var status string
        err := m.db.QueryRowContext(ctx, 
            "SELECT status FROM migration_chunks WHERE migration_name = $1 AND chunk_id = $2",
            migration.Name, i).Scan(&status)

        if err == sql.ErrNoRows || (err == nil && status != "completed") {
            chunks = append(chunks, i)
        } else if err != nil {
            return fmt.Errorf("failed to check chunk status: %w", err)
        }
    }

    log.Printf("Found %d chunks to process out of %d total", len(chunks), migration.ChunkCount)

    if len(chunks) == 0 {
        log.Printf("All chunks already completed for migration '%s'", migration.Name)
        return nil
    }

    // Process chunks with worker pool
    var wg sync.WaitGroup
    errors := make(chan error, len(chunks))
    semaphore := make(chan struct{}, migration.Concurrency)

    for _, chunk := range chunks {
        wg.Add(1)
        go func(chunkID int) {
            defer wg.Done()
            semaphore <- struct{}{}
            defer func() { <-semaphore }()

            // Mark chunk as in progress
            _, err := m.db.ExecContext(ctx, `
                INSERT INTO migration_chunks (migration_name, chunk_id, status, started_at)
                VALUES ($1, $2, 'in_progress', NOW())
                ON CONFLICT (migration_name, chunk_id) 
                DO UPDATE SET status = 'in_progress', started_at = NOW(), error = NULL
            `, migration.Name, chunkID)

            if err != nil {
                errors <- fmt.Errorf("failed to update chunk status: %w", err)
                return
            }

            // Process the chunk
            chunkCtx, chunkCancel := context.WithTimeout(ctx, 30*time.Minute)
            defer chunkCancel()

            tx, err := m.db.BeginTx(chunkCtx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
            if err != nil {
                m.recordChunkError(ctx, migration.Name, chunkID, err)
                errors <- fmt.Errorf("failed to begin transaction: %w", err)
                return
            }

            err = migration.ChunkFn(chunkCtx, tx, chunkID)
            if err != nil {
                tx.Rollback()
                m.recordChunkError(ctx, migration.Name, chunkID, err)
                errors <- fmt.Errorf("chunk %d failed: %w", chunkID, err)
                return
            }

            if err = tx.Commit(); err != nil {
                m.recordChunkError(ctx, migration.Name, chunkID, err)
                errors <- fmt.Errorf("failed to commit chunk %d: %w", chunkID, err)
                return
            }

            // Mark chunk as completed
            _, err = m.db.ExecContext(ctx, `
                UPDATE migration_chunks 
                SET status = 'completed', completed_at = NOW()
                WHERE migration_name = $1 AND chunk_id = $2
            `, migration.Name, chunkID)

            if err != nil {
                errors <- fmt.Errorf("failed to update chunk status: %w", err)
                return
            }

            log.Printf("Completed chunk %d of migration '%s'", chunkID, migration.Name)
        }(chunk)
    }

    // Wait for all goroutines and collect errors
    go func() {
        wg.Wait()
        close(errors)
    }()

    // Process errors
    var failedChunks int
    for err := range errors {
        failedChunks++
        log.Printf("Error: %v", err)
    }

    if failedChunks > 0 {
        return fmt.Errorf("%d chunks failed to process", failedChunks)
    }

    log.Printf("Completed migration '%s' in %v", migration.Name, time.Since(start))
    return nil
}

func (m *Migrator) recordChunkError(ctx context.Context, migrationName string, chunkID int, err error) {
    _, execErr := m.db.ExecContext(ctx, `
        UPDATE migration_chunks 
        SET status = 'failed', error = $3
        WHERE migration_name = $1 AND chunk_id = $2
    `, migrationName, chunkID, err.Error())

    if execErr != nil {
        log.Printf("Failed to record chunk error: %v", execErr)
    }
}
Enter fullscreen mode Exit fullscreen mode

This framework supports both simple migrations and complex chunked operations with automatic resumability.

Real-World Applications

I've applied these techniques in various production scenarios:

  1. Migrating a 300GB product catalog with zero downtime
  2. Converting a text field to JSON for 50 million user records
  3. Adding full-text search to a legacy application
  4. Splitting a monolithic database into microservice-specific schemas

In each case, these strategies significantly reduced migration time and minimized production impact.

The most important lesson I've learned is to treat database migrations with the same level of care as critical application code. By planning thoroughly, implementing robust migration patterns, and monitoring execution closely, even the most complex database migrations can be performed safely and efficiently.

By applying these Go-specific patterns and best practices, you can transform database migrations from a source of stress to a routine operational task. The performance optimizations discussed here not only make migrations faster but also more reliable, which is crucial for maintaining high availability in production systems.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)