From 4b68c48fbeb284e2a288fb0bb5f47dd52c729966 Mon Sep 17 00:00:00 2001 From: Nhan Nguyen Date: Thu, 30 Sep 2021 15:42:41 +0700 Subject: [PATCH 1/2] Support Cequel adapter --- .../examples/cequel_article.rb | 104 ++++++++++++++++++ .../lib/elasticsearch/model.rb | 1 + .../elasticsearch/model/adapters/cequel.rb | 78 +++++++++++++ 3 files changed, 183 insertions(+) create mode 100644 elasticsearch-model/examples/cequel_article.rb create mode 100644 elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb diff --git a/elasticsearch-model/examples/cequel_article.rb b/elasticsearch-model/examples/cequel_article.rb new file mode 100644 index 00000000..138a374b --- /dev/null +++ b/elasticsearch-model/examples/cequel_article.rb @@ -0,0 +1,104 @@ +# Cequel and Elasticsearch +# ================================== +# +# https://github.com/cequel/cequel + + +$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__) + +require 'pry' +Pry.config.history.file = File.expand_path('/tmp/elasticsearch_development.pry', __FILE__) + +require 'benchmark' +require 'logger' + +require 'ansi/core' +require 'cequel' + +require 'elasticsearch/model' +require 'elasticsearch/model/callbacks' + +require 'rake' + +# Load default tasks from Cequel +# +spec = Gem::Specification.find_by_name 'cequel' +load "#{spec.gem_dir}/lib/cequel/record/tasks.rb" + +# Cassandra connection settings +# +cequel_config = { + host: '127.0.0.1', + port: 9042, + keyspace: 'cequel_test', + max_retries: 3, + retry_delay: 1, + replication: { + class: 'SimpleStrategy', + replication_factor: 1 + } +} + +# Elastic config +# +elastic_config = { + host: 'localhost:9200', + log: true +} + +connection = Cequel.connect cequel_config +Cequel::Record.connection = connection + +Elasticsearch::Model.client = Elasticsearch::Client.new elastic_config + + +class Article + include Cequel::Record + + key :id, :int + + column :title, :text + column :published_at, :timestamp + + def as_indexed_json(options = {}) + as_json(except: [:id, :_id]) + end +end + +Article.__send__ :include, Elasticsearch::Model +Article.__send__ :include, Elasticsearch::Model::Callbacks + +# Initialize Cassandra and synchronize schema +# +Rake.application['cequel:reset'].invoke +Article.synchronize_schema + +Article.delete_all +Article.new(id: 1, title: 'Foo').save! +Article.new(id: 2, title: 'Bar').save! + +client = Elasticsearch::Client.new elastic_config + +client.indices.delete index: 'articles' rescue nil + + +client.bulk index: 'articles', + type: 'article', + body: Article.all.map { |a| { index: { _id: a.id, data: a.attributes } } }, + refresh: true + +Article.new(id: 3, title: 'Foo Bar').save! + +response = Article.search 'bar' +#x = response.records +#puts x.class +#puts x.to_a.to_s + + +#puts x.records.where({ :id => [3] }) + +# puts Benchmark.realtime { 9_875.times { |i| Article.new( id: i, title: "Foo #{i}").save! } } + +Pry.start(binding, prompt: lambda { |obj, nest_level, _| '> ' }, + input: StringIO.new('response.records.to_a'), + quiet: true) diff --git a/elasticsearch-model/lib/elasticsearch/model.rb b/elasticsearch-model/lib/elasticsearch/model.rb index af2f34cb..1921002e 100644 --- a/elasticsearch-model/lib/elasticsearch/model.rb +++ b/elasticsearch-model/lib/elasticsearch/model.rb @@ -31,6 +31,7 @@ require 'elasticsearch/model/adapter' require 'elasticsearch/model/adapters/default' require 'elasticsearch/model/adapters/active_record' +require 'elasticsearch/model/adapters/cequel' require 'elasticsearch/model/adapters/mongoid' require 'elasticsearch/model/adapters/multiple' diff --git a/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb b/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb new file mode 100644 index 00000000..89438bdd --- /dev/null +++ b/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb @@ -0,0 +1,78 @@ +module Elasticsearch + module Model + module Adapter + + # An adapter for Cequel-based models + # + # @see https://github.com/cequel/cequel + # + module Cequel + + Adapter.register self, lambda { |klass| !!defined?(::Cequel::Record) && klass.respond_to?(:ancestors) && klass.ancestors.include?(::Cequel::Record) } + + module Records + + # Return a `Cequel::RecordSet` instance + # + def records + pk = klass.key_column_names[0] + res = klass.where(pk => ids) + + res.instance_exec(response.response['hits']['hits']) do |hits| + define_singleton_method :to_a do + self.entries.sort_by do |e| + hits.index do |hit| + hit['_id'].to_s == e.id.to_s + end + end + end + end + + return res + end + end + + module Callbacks + + # Handle index updates (creating, updating or deleting documents) + # when the model changes, by hooking into the lifecycle + # + # @see https://github.com/cequel/cequel/blob/master/lib/cequel/record/callbacks.rb + # + def self.included(base) + [:save, :create, :update].each do |item| + base.send("after_#{ item }", lambda { __elasticsearch__.index_document }) + end + + base.after_destroy { __elasticsearch__.delete_document } + end + end + + module Importing + # Fetch batches of records from the database (used by the import method) + # + # @see http://api.rubyonrails.org/classes/ActiveRecord/Batches.html ActiveRecord::Batches.find_in_batches + # + def __find_in_batches(options={}, &block) + query = options.delete(:query) + named_scope = options.delete(:scope) + preprocess = options.delete(:preprocess) + + scope = self + scope = scope.__send__(named_scope) if named_scope + scope = scope.instance_exec(&query) if query + + scope.find_in_batches(**options) do |batch| + batch = self.__send__(preprocess, batch) if preprocess + yield(batch) if batch.present? + end + end + + def __transform + lambda { |model| { index: { _id: model.id, data: model.__elasticsearch__.as_indexed_json } } } + end + end + end + end + end +end From 75fa76d5e9759810c0e28fc39fc46ffd63a5994a Mon Sep 17 00:00:00 2001 From: Nhan Nguyen Date: Thu, 30 Sep 2021 15:48:06 +0700 Subject: [PATCH 2/2] Update code style. --- .../lib/elasticsearch/model/adapters/cequel.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb b/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb index 89438bdd..b64da5d6 100644 --- a/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb +++ b/elasticsearch-model/lib/elasticsearch/model/adapters/cequel.rb @@ -28,7 +28,7 @@ def records end end - return res + res end end @@ -53,7 +53,7 @@ module Importing # # @see http://api.rubyonrails.org/classes/ActiveRecord/Batches.html ActiveRecord::Batches.find_in_batches # - def __find_in_batches(options={}, &block) + def __find_in_batches(options = {}, &block) query = options.delete(:query) named_scope = options.delete(:scope) preprocess = options.delete(:preprocess)