Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cequel adapter for Cassandra #1033

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions elasticsearch-model/examples/cequel_article.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Cequel and Elasticsearch
# ==================================
#
# https://github.com/cequel/cequel


$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__)

require 'pry'
Pry.config.history.file = File.expand_path('/tmp/elasticsearch_development.pry', __FILE__)

require 'benchmark'
require 'logger'

require 'ansi/core'
require 'cequel'

require 'elasticsearch/model'
require 'elasticsearch/model/callbacks'

require 'rake'

# Load default tasks from Cequel
#
spec = Gem::Specification.find_by_name 'cequel'
load "#{spec.gem_dir}/lib/cequel/record/tasks.rb"

# Cassandra connection settings
#
cequel_config = {
host: '127.0.0.1',
port: 9042,
keyspace: 'cequel_test',
max_retries: 3,
retry_delay: 1,
replication: {
class: 'SimpleStrategy',
replication_factor: 1
}
}

# Elastic config
#
elastic_config = {
host: 'localhost:9200',
log: true
}

connection = Cequel.connect cequel_config
Cequel::Record.connection = connection

Elasticsearch::Model.client = Elasticsearch::Client.new elastic_config


class Article
include Cequel::Record

key :id, :int

column :title, :text
column :published_at, :timestamp

def as_indexed_json(options = {})
as_json(except: [:id, :_id])
end
end

Article.__send__ :include, Elasticsearch::Model
Article.__send__ :include, Elasticsearch::Model::Callbacks

# Initialize Cassandra and synchronize schema
#
Rake.application['cequel:reset'].invoke
Article.synchronize_schema

Article.delete_all
Article.new(id: 1, title: 'Foo').save!
Article.new(id: 2, title: 'Bar').save!

client = Elasticsearch::Client.new elastic_config

client.indices.delete index: 'articles' rescue nil


client.bulk index: 'articles',
type: 'article',
body: Article.all.map { |a| { index: { _id: a.id, data: a.attributes } } },
refresh: true

Article.new(id: 3, title: 'Foo Bar').save!

response = Article.search 'bar'
#x = response.records
#puts x.class
#puts x.to_a.to_s


#puts x.records.where({ :id => [3] })

# puts Benchmark.realtime { 9_875.times { |i| Article.new( id: i, title: "Foo #{i}").save! } }

Pry.start(binding, prompt: lambda { |obj, nest_level, _| '> ' },
input: StringIO.new('response.records.to_a'),
quiet: true)
1 change: 1 addition & 0 deletions elasticsearch-model/lib/elasticsearch/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
require 'elasticsearch/model/adapter'
require 'elasticsearch/model/adapters/default'
require 'elasticsearch/model/adapters/active_record'
require 'elasticsearch/model/adapters/cequel'
require 'elasticsearch/model/adapters/mongoid'
require 'elasticsearch/model/adapters/multiple'

Expand Down
78 changes: 78 additions & 0 deletions elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
module Elasticsearch
module Model
module Adapter

# An adapter for Cequel-based models
#
# @see https://github.com/cequel/cequel
#
module Cequel

Adapter.register self, lambda { |klass| !!defined?(::Cequel::Record) && klass.respond_to?(:ancestors) && klass.ancestors.include?(::Cequel::Record) }

module Records

# Return a `Cequel::RecordSet` instance
#
def records
pk = klass.key_column_names[0]
res = klass.where(pk => ids)

res.instance_exec(response.response['hits']['hits']) do |hits|
define_singleton_method :to_a do
self.entries.sort_by do |e|
hits.index do |hit|
hit['_id'].to_s == e.id.to_s
end
end
end
end

res
end
end

module Callbacks

# Handle index updates (creating, updating or deleting documents)
# when the model changes, by hooking into the lifecycle
#
# @see https://github.com/cequel/cequel/blob/master/lib/cequel/record/callbacks.rb
#
def self.included(base)
[:save, :create, :update].each do |item|
base.send("after_#{ item }", lambda { __elasticsearch__.index_document })
end

base.after_destroy { __elasticsearch__.delete_document }
end
end

module Importing
# Fetch batches of records from the database (used by the import method)
#
# @see http://api.rubyonrails.org/classes/ActiveRecord/Batches.html ActiveRecord::Batches.find_in_batches
#
def __find_in_batches(options = {}, &block)
query = options.delete(:query)
named_scope = options.delete(:scope)
preprocess = options.delete(:preprocess)

scope = self
scope = scope.__send__(named_scope) if named_scope
scope = scope.instance_exec(&query) if query

scope.find_in_batches(**options) do |batch|
batch = self.__send__(preprocess, batch) if preprocess
yield(batch) if batch.present?
end
end

def __transform
lambda { |model| { index: { _id: model.id, data: model.__elasticsearch__.as_indexed_json } } }
end
end
end
end
end
end