Long Running Data Migrations in Rails

A post by Peter Hollows about rails, useful gems, and database fu. Posted 8 months ago.

Projects almost never start with an empty database. There are many takes on how this initial data, or data processing tasks, should live in your application. This approach keeps data out of your migrations and remains flexible enough to run with all the methods available to a standard migration, plus some extras.

Initial application data, also known as seed data, can be loaded with the seed data support baked into Rails. Our approach can load seed data as well, but is less standard and more for long-running data processing tasks.

I built this to help me crunch numbers on 12G worth of geometric data in text files to produce seed data for an application. Most of the logic for this task was present in the application model space so it made sense to use the Rails environment to process the data. However, as I’m largely making use of PostGIS extensions and SQL my first approach was to use simply use an SQL script for the task. I needed something in the middle.

What you get

$ screen -a
$ rake db:process:land_titles
-- Loading titles
   -> 14541.2230s
-- Processing titles
Shaping:  51% |oooooooooooooooooooo                    | ETA:  25:01:34

Implementation

The Rails migration class is the key here, it has access to helper methods for interacting with the database and also access to your model space.

I am using progress bars in my solution (processing takes days) which require the progressbar gem. Install using:

sudo gem install progressbar

As the data loading/processing tasks is invoked with Rake, we need to write the task. Use something like the following to call your processor.

# lib/tasks/processing.rake
namespace [:db, :process] do
  desc "Load and process land title data"
  task :land_titles => :environment do
    TitleDataProcessor.run
  end
end

Now we have our rake task, we need to write our processor class. This is an example of one used in a project. I’ve included a library for ActiveRecord::Base to give us useful processing helpers.

# lib/title_data_processor.rb
class TitleDataProcessor < ActiveRecord::Migration
  class << self
    # Invoked by the rake task.
    def run
      ActiveRecord::Base.send(:include, ModelExtensions::BatchProcessing)
      
      # We uncomment this transaction block when we have it working.
      # ActiveRecord::Base.transaction do
        say_with_time("Loading titles") { load_titles }
        say_with_time("Processing titles") do
          process_title_shape
          process_title_recommendation
        end
      # end
    end
    
    def load_titles
      LandTitle.load_from_raw
    end
    
    def process_title_shape
      # This will give you a nice progress bar and call the block
      # with id ranges letting you batch-update and view progress.
      LandTitle.batch_process('Shaping') do |start_id, next_id|
        suppress_messages do
          execute %?
            UPDATE land_titles
            SET shape = ST_Translate(
              ST_GeomFromText(substring(shape from 3), 4167),
              160, 0, 0
            )
            WHERE shape IS NOT NULL
            AND id >= #{start_id}
            AND id <  #{next_id}
          ?
          
          execute <<-SQL
            UPDATE land_titles SET
            lat = RADIANS(ST_Y(ST_Centroid(shape_geom))),
            lng = RADIANS(ST_X(ST_Centroid(shape_geom)))
            WHERE id >= #{start_id}
            AND   id <  #{next_id}
          SQL
        end
      end
    end
    
    def process_title_recommendation
      # This will give a nice progress bar too, the only difference
      # is process_each calls with block with individual records
      # instead of id ranges.
      LandTitle.process_each('Thinking') do |land_title|
        land_title.calculate_recommendation!
      end
    end
  end
end

Here’s the BatchProcessing module that we’re mixing into ActiveRecord to keep our data processing logic tidy.

module BatchProcessing
  def self.included(base)
    base.extend ClassMethods
  end
  
  module ClassMethods
    def batch_process(title = 'Processing', batch_size = 1000, &block)
      progress = ProgressBar.new(title, count)
      start_id = minimum(:id) || 0
      
      loop do
        next_id = first(:select => 'id', :order => 'id', :conditions => ['? < id', start_id], :offset => batch_size).try(:id)
        next_id ||= maximum(:id) || 0
        records = count(:conditions => ['id >= id AND id < ?', start_id, next_id])
        break if records.zero?
        yield(start_id, next_id)
        progress.inc(records)
        start_id = next_id
      end
      
      progress.finish
    end
    
    def process_each(options = {}, &block)
      title = options.delete(:title) || 'Processing'
      progress = ProgressBar.new(title, count)
      
      find_each(options) do |record|
        yield(record)
        progress.inc
      end
      
      progress.finish
    end
  end
end