Skip to content

Commit

Permalink
🎉 create import script
Browse files Browse the repository at this point in the history
  • Loading branch information
ohararyan committed Apr 18, 2023
0 parents commit 004ef78
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.env
progress.txt
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# frozen_string_literal: true
source 'https://rubygems.org'

gem 'httparty'
17 changes: 17 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
GEM
remote: https://rubygems.org/
specs:
httparty (0.21.0)
mini_mime (>= 1.0.0)
multi_xml (>= 0.5.2)
mini_mime (1.1.2)
multi_xml (0.6.0)

PLATFORMS
arm64-darwin-22

DEPENDENCIES
httparty

BUNDLED WITH
2.4.6
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Amplitude Bulk Uploader

For bulk uploading/backfilling data into an Amplitude project based on an export of project data.

This script is inspired by and updated from [here](https://github.com/ello/amplitude-import/tree/master)

Simply download your existing project data from Amplitude and point the script to the folder. It will parse and upload the files, log them to a plain text file and should you end up cancelling the script at anytime, it will allow you to pick up the import where you left off.

---

## Prerequisites

* Get an API key from Amplitude for your project that you wish to import in to.

> ⚠️ (It's recommended to create a new test project to test the import and everything looks as expected before doing this on your production project)
* Export all of your existing project data from Amplitudes project detail pane. Download and unzip the file; this will create a new folder with a bunch of gzip'd files. Great, there's nothing else you need to do here, the script will handle the parsing of the gzip'd data.

---

## Usage

* Install the dependencies

```
bundle install
```

* Run the script

```
API_KEY=<your API key> bundle exec ruby import.rb <path to folder>
```

---

### Credits

Thanks to [ello](https://github.com/ello/amplitude-import/tree/master) for the initial script which this was inspired from
125 changes: 125 additions & 0 deletions import.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
require 'rubygems'
require 'bundler/setup'
require 'logger'
require 'json'
require 'zlib'
require 'httparty'

unless ENV['API_KEY']
abort 'Must set API_KEY'
end

class AmplitudeImporter
API_KEY = ENV['API_KEY'].freeze
ENDPOINT = 'https://api2.amplitude.com/batch'.freeze
PROGRESS_FILE_PATH = 'progress.txt'.freeze
RATE_LIMIT = 1000

def read_and_merge_events(file_path)
logger.info "Processing #{file_path}"

# Create an array to hold the events
events = []

# Read the gzipped file
json_data = Zlib::GzipReader.open(file_path)

# Parse each line of the file as JSON and add it to the events array
json_data.each_line do |line|
begin
events << JSON.parse(line)
rescue JSON::ParserError => e
logger.error "Error parsing JSON: #{e}"
end
end

events
end

def send_events_to_amplitude(events)
# Divide the events into batches of 100
batches = events.each_slice(1000).to_a

start_time = Time.now

batches.each do |event_batch|
# Wait if needed to comply with Amplitude's rate limit
elapsed_time = Time.now - start_time
if elapsed_time < 1
sleep(1 - elapsed_time)
end

# Bulk upload the events object to Amplitude's endpoint
body = {
api_key: API_KEY,
events: event_batch,
options: {
min_id_length: 0,
}
}.to_json

response = HTTParty.post(
ENDPOINT,
headers: {
'Content-Type' => 'application/json'
},
body: body
)

# Check the response status and handle accordingly
if response.code.to_i == 200
logger.info "Submitted batch of #{event_batch.length} events (#{events.length} total) successfully"
else
logger.info "Failed to upload events: #{response.code} - #{response.message}"

# Log error details
puts "Error Body: #{response.body}"
end

# Update start time for next rate limit check
start_time = Time.now
end
end

def run(directory)
submitted_count = 0
failed_count = 0

# Load progress from progress file
processed_files = File.read(PROGRESS_FILE_PATH).split("\n").map(&:strip).select { |line| line != "" }.map { |line| line.split("\t")[1] }

files = Dir.glob("#{directory}/**/*.gz")

logger.info "Processing #{files.length} files"

files.each do |file_path|
next if processed_files.include?(file_path) # Skip files that were already processed

# Read the events from the file
events = read_and_merge_events(file_path)
success = send_events_to_amplitude(events)

if success
processed_files << file_path
File.write(PROGRESS_FILE_PATH, "#{File.size(file_path)}\t#{file_path}\n", mode: 'a')
submitted_count += events.length
else
failed_count += events.length
# If upload fails, stop processing files to avoid duplicate events
break
end
end

logger.info "Submitted #{submitted_count} events"
logger.info "Failed to submit #{failed_count} events"
logger.info "Check the log for more details"
end

private

def logger
@logger ||= Logger.new(STDERR)
end
end

AmplitudeImporter.new.run(ARGV[0])

0 comments on commit 004ef78

Please sign in to comment.