From 7f15a437666bccf70fe46ee5632585752bb8de0e Mon Sep 17 00:00:00 2001 From: Pranav Prakash Date: Wed, 27 Jul 2016 15:10:12 +0530 Subject: [PATCH 1/3] First draft of elasticsearch integration --- octo-core/Gemfile.lock | 8 +- octo-core/bin/clean_build.sh | 11 ++- octo-core/lib/octocore.rb | 3 + octo-core/lib/octocore/models/user.rb | 91 +++++++++++++++++++-- octo-core/lib/octocore/schedeuleable.rb | 2 +- octo-core/lib/octocore/search/client.rb | 17 +++- octo-core/lib/octocore/search/indexer.rb | 29 +++++++ octo-core/lib/octocore/search/searchable.rb | 31 ++++--- octo-core/lib/octocore/search/setup.rb | 3 + octo-core/lib/octocore/utils.rb | 15 ++++ octo-core/octocore.gemspec | 1 + octo-notification/octonotification.gemspec | 2 +- 12 files changed, 190 insertions(+), 23 deletions(-) diff --git a/octo-core/Gemfile.lock b/octo-core/Gemfile.lock index 05aaf57..9149e0c 100644 --- a/octo-core/Gemfile.lock +++ b/octo-core/Gemfile.lock @@ -5,6 +5,7 @@ PATH cequel (~> 1.9, >= 1.9.0) descriptive_statistics (~> 2.5.1, >= 2.5.0) elasticsearch (~> 1.0.17, >= 1.0.17) + elasticsearch-model (~> 0.1.9, >= 0.1.9) faraday (~> 0.9.2, >= 0.9.2) hiredis (~> 0.6.1, >= 0.6.0) hooks (~> 0.4.1, >= 0.4.1) @@ -41,11 +42,16 @@ GEM elasticsearch-transport (= 1.0.18) elasticsearch-api (1.0.18) multi_json + elasticsearch-model (0.1.9) + activesupport (> 3) + elasticsearch (> 0.4) + hashie elasticsearch-transport (1.0.18) faraday multi_json faraday (0.9.2) multipart-post (>= 1.2, < 3) + hashie (3.4.4) hiredis (0.6.1) hooks (0.4.1) uber (~> 0.0.14) @@ -90,7 +96,7 @@ GEM diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.4.0) rspec-support (3.4.1) - ruby-kafka (0.3.10) + ruby-kafka (0.3.11) rufus-scheduler (3.2.1) sinatra (1.4.7) rack (~> 1.5) diff --git a/octo-core/bin/clean_build.sh b/octo-core/bin/clean_build.sh index 42e6435..99258ec 100755 --- a/octo-core/bin/clean_build.sh +++ b/octo-core/bin/clean_build.sh @@ -1,2 +1,11 @@ +#!/bin/bash + +# Remove any existing gem present rm *.gem -gem build octocore.gemspec && gem uninstall octocore --force && gem install octocore-0.0.1.gem + +# Build the gem +gem build octocore.gemspec && gem uninstall octocore --force + + +# Install it +find . -name "*.gem" | xargs gem install diff --git a/octo-core/lib/octocore.rb b/octo-core/lib/octocore.rb index df6024e..7391dd2 100644 --- a/octo-core/lib/octocore.rb +++ b/octo-core/lib/octocore.rb @@ -127,6 +127,8 @@ def self._connect(configuration) self.logger.info('Setting callbacks.') + #Elasticsearch::Model.client = Octo::Search::Client.new + end # Creates a logger for Octo @@ -140,3 +142,4 @@ def self.logger end end + diff --git a/octo-core/lib/octocore/models/user.rb b/octo-core/lib/octocore/models/user.rb index 79aa6d1..02db92a 100644 --- a/octo-core/lib/octocore/models/user.rb +++ b/octo-core/lib/octocore/models/user.rb @@ -1,21 +1,102 @@ require 'cequel' +require 'octocore/search/searchable' module Octo class User include Cequel::Record + include Octo::Search::Searchable belongs_to :enterprise, class_name: 'Octo::Enterprise' key :id, :bigint + column :last_login, :timestamp + column :curr_city, :text + column :curr_state, :text + column :curr_country, :text + + column :home_location_lat, :float + column :home_location_lon, :float + + column :work_location_lat, :float + column :work_location_lon, :float + timestamps # Define the associations - has_many :user_location_histories - has_many :user_phone_details - has_many :push_token - has_many :user_browser_details - has_many :user_personas + has_many :user_location_histories, class_name: 'Octo::UserLocationHistory' + has_many :user_phone_details, class_name: 'Octo::UserPhoneDetails' + has_many :push_token, class_name: 'Octo::PushToken' + has_many :user_browser_details, class_name: 'Octo::UserBrowserDetails' + has_many :user_personas, class_name: 'Octo::UserPersona' + + + # Returns the data for indexing purposess. + def indexed_json + i = Hash.new + i.merge!({ + id: id, + userid: id, + enterpriseid: enterprise.id.to_s, + created: created_at, + updated: updated_at, + last_login: last_login, + device_id: device_ids, + manufacturer: device_manufacturers, + model: device_models + }) + i[:city] = curr_city if curr_city + i[:country] = curr_country if curr_country + i[:state] = curr_state if curr_state + i[:os] = os if os + i[:browser] = browsers if browsers.count > 0 + i[:engagement] = engagement if engagement + i[:home_location] = home_location if home_location + i[:work_location] = work_location if work_location + i[:persona] = user_personas if user_personas.count > 0 + i[:time_slots] = time_slots if time_slots.count > 0 + i + end + + def device_ids + user_phone_details.collect { |x| x.deviceid } + end + + def device_manufacturers + user_phone_details.collect { |x| x.manufacturer } + end + + def device_models + user_phone_details.collect { |x| x.model } + end + + def browsers + [] + end + + def os + [] + end + + def engagement + 1 + end + + def home_location + if home_location_lat and home_location_lon + [home_location_lat, home_location_lon].as_geopoint + end + end + + def work_location + if work_location_lat and work_location_lon + [work_location_lat, work_location_lon].as_geopoint + end + end + + def time_slots + [] + end end end diff --git a/octo-core/lib/octocore/schedeuleable.rb b/octo-core/lib/octocore/schedeuleable.rb index 931e1f4..6e2d52b 100644 --- a/octo-core/lib/octocore/schedeuleable.rb +++ b/octo-core/lib/octocore/schedeuleable.rb @@ -17,4 +17,4 @@ def perform(*args) end end -end \ No newline at end of file +end diff --git a/octo-core/lib/octocore/search/client.rb b/octo-core/lib/octocore/search/client.rb index 6bb6737..51544c0 100644 --- a/octo-core/lib/octocore/search/client.rb +++ b/octo-core/lib/octocore/search/client.rb @@ -11,22 +11,31 @@ class Client include Elasticsearch::API - CONNECTION = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server]) - # Low level method for performing a request to Elastic Search cluster # @param [String] method The method ex: get, put, post, etc.. # @param [String] path The path of the request # @param [Hash] params The params of the request # @param [String] body The body of the request def perform_request(method, path, params, body) - Octo.logger.debug "--> #{method.upcase} #{path} #{params} #{body}" + Octo.logger.info "--> #{method.upcase} #{path} #{params} #{body}" - CONNECTION.run_request \ + response = connection.run_request \ method.downcase.to_sym, path, ( body ? MultiJson.dump(body): nil ), {'Content-Type' => 'application/json'} + + Octo.logger.info "#{ response.status }, #{ response.body }" end + + # Creates a new connection to the server if not already done + def connection + unless @connection + @connection = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server]) + end + @connection + end + end end end diff --git a/octo-core/lib/octocore/search/indexer.rb b/octo-core/lib/octocore/search/indexer.rb index e69de29..1bc50b9 100644 --- a/octo-core/lib/octocore/search/indexer.rb +++ b/octo-core/lib/octocore/search/indexer.rb @@ -0,0 +1,29 @@ +require 'octocore/search/client' + +module Octo + module Search + + # Its a Resque worker that sends index calls + # + class Indexer + + # The queue on which the job shall be performed + # + @queue = :indexer_queue + + def self.perform(action, data) + _data = data.deep_symbolize_keys + indexname = _data.delete(:index) + self.client.index index: indexname, + type: indexname, + id: _data[:body][:id], + body: _data.delete(:body) + end + + def self.client + @client ||= Octo::Search::Client.new + end + + end + end +end diff --git a/octo-core/lib/octocore/search/searchable.rb b/octo-core/lib/octocore/search/searchable.rb index 3bc25df..141a21b 100644 --- a/octo-core/lib/octocore/search/searchable.rb +++ b/octo-core/lib/octocore/search/searchable.rb @@ -1,18 +1,29 @@ module Octo - module Searchable + module Search + module Searchable - # Gets the search client - def searchclient - unless @searchClient - @searchClient = Octo::Search::Client.new + def self.included(base) + [:save, :create, :update].each do |mtd| + base.send("after_#{ mtd }", lambda { async_index indexed_json }) + end + base.after_destroy { async_delete id } end - @searchClient - end - # Defines the indice with which this would be indexed - def indexable_with(indice_name, type) + def async_index(data) + _data = { index: index_name, + body: data } + Resque.enqueue(Octo::Search::Indexer, :index, _data) + end - end + def async_delete(id) + Resque.enqueue(Octo::Search::Indexer, :delete, id) + end + def index_name + @index_name ||= self.class.to_s.split(/::/).last.downcase + end + + end end end + diff --git a/octo-core/lib/octocore/search/setup.rb b/octo-core/lib/octocore/search/setup.rb index 2a47298..f7f823a 100644 --- a/octo-core/lib/octocore/search/setup.rb +++ b/octo-core/lib/octocore/search/setup.rb @@ -1,5 +1,7 @@ module Octo + module Search + # Setup module for ElasticSearch module Setup @@ -67,5 +69,6 @@ class Update end + end end diff --git a/octo-core/lib/octocore/utils.rb b/octo-core/lib/octocore/utils.rb index c6b2102..edd541e 100644 --- a/octo-core/lib/octocore/utils.rb +++ b/octo-core/lib/octocore/utils.rb @@ -88,3 +88,18 @@ def deep_merge(second) self.merge(second.to_h, &merger) end end + + +class ::Array + + def as_geopoint + if self.count == 2 + { + lat: self[0], + lon: self[1] + } + else + raise ArgumentError, 'Cannot convert as geopoint' + end + end +end diff --git a/octo-core/octocore.gemspec b/octo-core/octocore.gemspec index 9d9254e..3f1e238 100644 --- a/octo-core/octocore.gemspec +++ b/octo-core/octocore.gemspec @@ -37,6 +37,7 @@ DESC s.add_runtime_dependency 'ruby-kafka', '~> 0.3.2', '>= 0.3.2' s.add_runtime_dependency 'elasticsearch', '~> 1.0.17', '>= 1.0.17' s.add_runtime_dependency 'faraday', '~> 0.9.2', '>= 0.9.2' + s.add_runtime_dependency 'elasticsearch-model', '~> 0.1.9', '>= 0.1.9' s.add_development_dependency 'rspec', '~> 3.4.0', '>= 3.4.0' s.add_development_dependency 'parallel_tests', '~> 2.5.0', '>= 2.5.0' diff --git a/octo-notification/octonotification.gemspec b/octo-notification/octonotification.gemspec index 03762f6..8d1446a 100644 --- a/octo-notification/octonotification.gemspec +++ b/octo-notification/octonotification.gemspec @@ -26,4 +26,4 @@ DESC s.add_runtime_dependency 'gcm', '~> 0.1.1', '>= 0.1.0' s.add_runtime_dependency 'apns', '~> 1.0.0', '>= 1.0.0' s.add_runtime_dependency 'aws-sdk', '~> 2.2.35', '>= 2.2.35' -end \ No newline at end of file +end From 20f6e999c83751a294b7d1674476298a67d14fcc Mon Sep 17 00:00:00 2001 From: Pranav Prakash Date: Wed, 27 Jul 2016 17:23:11 +0530 Subject: [PATCH 2/3] Comments, and removing comments --- octo-core/lib/octocore.rb | 3 --- octo-core/lib/octocore/models/user.rb | 30 +++++++++++++++++++++ octo-core/lib/octocore/search/client.rb | 5 ++-- octo-core/lib/octocore/search/indexer.rb | 8 ++++++ octo-core/lib/octocore/search/searchable.rb | 11 ++++++++ octo-core/lib/octocore/utils.rb | 10 +++++-- 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/octo-core/lib/octocore.rb b/octo-core/lib/octocore.rb index 7391dd2..2eb4dce 100644 --- a/octo-core/lib/octocore.rb +++ b/octo-core/lib/octocore.rb @@ -126,9 +126,6 @@ def self._connect(configuration) require 'octocore/search' self.logger.info('Setting callbacks.') - - #Elasticsearch::Model.client = Octo::Search::Client.new - end # Creates a logger for Octo diff --git a/octo-core/lib/octocore/models/user.rb b/octo-core/lib/octocore/models/user.rb index 02db92a..d173d2a 100644 --- a/octo-core/lib/octocore/models/user.rb +++ b/octo-core/lib/octocore/models/user.rb @@ -32,6 +32,9 @@ class User # Returns the data for indexing purposess. + # @return [Hash] The user object's fields and values represented as a hash + # for indexing purposes + # def indexed_json i = Hash.new i.merge!({ @@ -58,42 +61,69 @@ def indexed_json i end + # Gets the list of device IDs for the user + # @return [Array] Device IDs for the user + # def device_ids user_phone_details.collect { |x| x.deviceid } end + # Gets the list of device manufacturers for the set of devices that user + # has + # @return [Array] The array of device manufacturers + # def device_manufacturers user_phone_details.collect { |x| x.manufacturer } end + # Gets the list of device models for the user + # @return [Array] The array of device models for the user + # def device_models user_phone_details.collect { |x| x.model } end + # Gets the browsers for the user + # @return [Array] Array of browsers + # def browsers [] end + # Gets the list of OSs for the user + # @return [Array] List of OSs of the user + # def os [] end + # Gets the engagement class of the user + # @return [Fixnum] The engagement class of user. Defaults to 1 + # def engagement 1 end + # Returns the home location of user as a geopoint data type + # @return [Hash] The home location of user + # def home_location if home_location_lat and home_location_lon [home_location_lat, home_location_lon].as_geopoint end end + # Gets the work location of user as a geopoint data type + # @return [Hash] The work location of user + # def work_location if work_location_lat and work_location_lon [work_location_lat, work_location_lon].as_geopoint end end + # Gets the time slots for which the user is active + # @return [Array] The time slots of user def time_slots [] end diff --git a/octo-core/lib/octocore/search/client.rb b/octo-core/lib/octocore/search/client.rb index 51544c0..0fcf454 100644 --- a/octo-core/lib/octocore/search/client.rb +++ b/octo-core/lib/octocore/search/client.rb @@ -17,7 +17,7 @@ class Client # @param [Hash] params The params of the request # @param [String] body The body of the request def perform_request(method, path, params, body) - Octo.logger.info "--> #{method.upcase} #{path} #{params} #{body}" + Octo.logger.debug "--> #{method.upcase} #{path} #{params} #{body}" response = connection.run_request \ method.downcase.to_sym, @@ -25,7 +25,8 @@ def perform_request(method, path, params, body) ( body ? MultiJson.dump(body): nil ), {'Content-Type' => 'application/json'} - Octo.logger.info "#{ response.status }, #{ response.body }" + Octo.logger.debug "#{ response.status }, #{ response.body }" + response end # Creates a new connection to the server if not already done diff --git a/octo-core/lib/octocore/search/indexer.rb b/octo-core/lib/octocore/search/indexer.rb index 1bc50b9..752c377 100644 --- a/octo-core/lib/octocore/search/indexer.rb +++ b/octo-core/lib/octocore/search/indexer.rb @@ -11,6 +11,11 @@ class Indexer # @queue = :indexer_queue + # Performs the indexing part + # @param [String] action The action to be performed. Namely `index` or + # `delete` + # @param [Hash] data The data to be used while performing the action + # def self.perform(action, data) _data = data.deep_symbolize_keys indexname = _data.delete(:index) @@ -20,6 +25,9 @@ def self.perform(action, data) body: _data.delete(:body) end + # Gets the search client for the indexer. + # @return [Octo::Search::Client] A search client + # def self.client @client ||= Octo::Search::Client.new end diff --git a/octo-core/lib/octocore/search/searchable.rb b/octo-core/lib/octocore/search/searchable.rb index 141a21b..ebc1de3 100644 --- a/octo-core/lib/octocore/search/searchable.rb +++ b/octo-core/lib/octocore/search/searchable.rb @@ -2,6 +2,9 @@ module Octo module Search module Searchable + # Extend the class with callbacks for indexing the document or deleting + # it based on appropriate events + # def self.included(base) [:save, :create, :update].each do |mtd| base.send("after_#{ mtd }", lambda { async_index indexed_json }) @@ -9,16 +12,24 @@ def self.included(base) base.after_destroy { async_delete id } end + # Perform an async indexing of the data + # @param [Hash] data The data to be indexed def async_index(data) _data = { index: index_name, body: data } Resque.enqueue(Octo::Search::Indexer, :index, _data) end + # Perform async delete of the document from index. + # @param [Fixnum] id The ID of the document to be removed from the index + # def async_delete(id) Resque.enqueue(Octo::Search::Indexer, :delete, id) end + # Helper module for getting the index name corresponding to the class + # @return [String] The name of the index to be used in elasticsearch + # def index_name @index_name ||= self.class.to_s.split(/::/).last.downcase end diff --git a/octo-core/lib/octocore/utils.rb b/octo-core/lib/octocore/utils.rb index edd541e..b21a74a 100644 --- a/octo-core/lib/octocore/utils.rb +++ b/octo-core/lib/octocore/utils.rb @@ -83,6 +83,8 @@ def to_slug end class ::Hash + + # Deep merge a hash with another hash def deep_merge(second) merger = proc { |key, v1, v2| Hash === v1 && Hash === v2 ? v1.merge(v2, &merger) : Array === v1 && Array === v2 ? v1 | v2 : [:undefined, nil, :nil].include?(v2) ? v1 : v2 } self.merge(second.to_h, &merger) @@ -92,11 +94,15 @@ def deep_merge(second) class ::Array + # Returns an array of float as a geopoint + # @return [Hash] A hash with keys as `lat` and `lon` with + # their corresponding values. This is the convenient + # wrapper for elasticsearch data type def as_geopoint if self.count == 2 { - lat: self[0], - lon: self[1] + lat: self[0].to_f, + lon: self[1].to_f } else raise ArgumentError, 'Cannot convert as geopoint' From 0d99ea5312833bdeab7ded9c5012a073846bf8f0 Mon Sep 17 00:00:00 2001 From: Pranav Prakash Date: Thu, 28 Jul 2016 17:25:10 +0530 Subject: [PATCH 3/3] Wraping over Elasticsearch Model --- octo-core/lib/octocore/models/user.rb | 21 +++++- octo-core/lib/octocore/search.rb | 32 ++++++++- octo-core/lib/octocore/search/indexer.rb | 25 ++++--- octo-core/lib/octocore/search/searchable.rb | 41 +++++++----- octo-core/lib/octocore/search/setup.rb | 74 --------------------- 5 files changed, 91 insertions(+), 102 deletions(-) delete mode 100644 octo-core/lib/octocore/search/setup.rb diff --git a/octo-core/lib/octocore/models/user.rb b/octo-core/lib/octocore/models/user.rb index d173d2a..fbfe8d8 100644 --- a/octo-core/lib/octocore/models/user.rb +++ b/octo-core/lib/octocore/models/user.rb @@ -30,12 +30,31 @@ class User has_many :user_browser_details, class_name: 'Octo::UserBrowserDetails' has_many :user_personas, class_name: 'Octo::UserPersona' + settings index: { number_of_shards: 1 } do + mappings dynamic: 'false' do + indexes :enterpriseid, analyzer: 'keyword', index_options: 'offsets' + indexes :id, type: :integer + indexes :last_login, type: :date + indexes :curr_state, type: :string + indexes :city, type: :string + indexes :state, type: :string + indexes :country, type: :string + indexes :os, type: :nested + indexes :browser, type: :nested + indexes :engagement, type: :integer + indexes :home_location, type: :geo_point + indexes :work_location, type: :geo_point + indexes :persona, type: :nested + indexes :time_slots, type: :nested + end + end + # Returns the data for indexing purposess. # @return [Hash] The user object's fields and values represented as a hash # for indexing purposes # - def indexed_json + def as_indexed_json(options = {}) i = Hash.new i.merge!({ id: id, diff --git a/octo-core/lib/octocore/search.rb b/octo-core/lib/octocore/search.rb index 2ff8999..b77e3ac 100644 --- a/octo-core/lib/octocore/search.rb +++ b/octo-core/lib/octocore/search.rb @@ -1,5 +1,35 @@ +require 'elasticsearch/model' + require 'octocore/search/client' -require 'octocore/search/setup' require 'octocore/search/indexer' require 'octocore/search/searchable' +module Octo + + module Search + + class InitConnection + + # Connects to the elasticsearch cluster and validates the presence + # of indexes for all the classes which have included + # `Octo::Search::Searchable` + # + def self.connect + if Octo.get_config(:search) and Octo.get_config(:search)[:server] + Elasticsearch::Model.client = Octo::Search::Client.new + + klasses = ObjectSpace.each_object(Class).select do |c| + c.included_modules.include? Octo::Search::Searchable + end + klasses.each { |k| k.__elasticsearch__.create_index! } + end + end + + end + end +end + +# Perform connection to Elasticsearch when required +# +Octo::Search::InitConnection.connect + diff --git a/octo-core/lib/octocore/search/indexer.rb b/octo-core/lib/octocore/search/indexer.rb index 752c377..14ac190 100644 --- a/octo-core/lib/octocore/search/indexer.rb +++ b/octo-core/lib/octocore/search/indexer.rb @@ -14,15 +14,24 @@ class Indexer # Performs the indexing part # @param [String] action The action to be performed. Namely `index` or # `delete` - # @param [Hash] data The data to be used while performing the action + # @param [Hash] opts The data to be used while performing the action + # @option opts [String] idx_name The index name to be used + # @option opts [String] doc_type The document type of the document + # @option opts [String] id The ID of the document + # @option opts [Hash] body The body of the document to be indexed # - def self.perform(action, data) - _data = data.deep_symbolize_keys - indexname = _data.delete(:index) - self.client.index index: indexname, - type: indexname, - id: _data[:body][:id], - body: _data.delete(:body) + def self.perform(action, opts={}) + action = action.to_sym + if action == :index + self.client.index index: opts['idx_name'], + type: opts['doc_type'], + id: opts['body']['id'], + body: opts['body'] + elsif action == :delete + self.client.delete index: opts['idx_name'], + type: opts['doc_type'], + id: opts['id'] + end end # Gets the search client for the indexer. diff --git a/octo-core/lib/octocore/search/searchable.rb b/octo-core/lib/octocore/search/searchable.rb index ebc1de3..0157169 100644 --- a/octo-core/lib/octocore/search/searchable.rb +++ b/octo-core/lib/octocore/search/searchable.rb @@ -1,3 +1,5 @@ +require 'elasticsearch/model' + module Octo module Search module Searchable @@ -6,32 +8,35 @@ module Searchable # it based on appropriate events # def self.included(base) - [:save, :create, :update].each do |mtd| - base.send("after_#{ mtd }", lambda { async_index indexed_json }) + base.send(:include, ::Elasticsearch::Model) + [:save].each do |mtd| + base.send("after_#{ mtd }", lambda { async_index self }) end - base.after_destroy { async_delete id } + base.after_destroy { async_delete self } end # Perform an async indexing of the data - # @param [Hash] data The data to be indexed - def async_index(data) - _data = { index: index_name, - body: data } - Resque.enqueue(Octo::Search::Indexer, :index, _data) - end - - # Perform async delete of the document from index. - # @param [Fixnum] id The ID of the document to be removed from the index + # @param [Object] model The model instance to be indexed # - def async_delete(id) - Resque.enqueue(Octo::Search::Indexer, :delete, id) + def async_index(model) + opts = { + idx_name: model.__elasticsearch__.index_name, + doc_type: model.__elasticsearch__.document_type, + body: model.as_indexed_json + } + Resque.enqueue(Octo::Search::Indexer, :index, opts) end - # Helper module for getting the index name corresponding to the class - # @return [String] The name of the index to be used in elasticsearch + # Perform async delete of the document from index. + # @param [Object] model The model instance to be indexed # - def index_name - @index_name ||= self.class.to_s.split(/::/).last.downcase + def async_delete(model) + opts = { + idx_name: model.__elasticsearch__.index_name, + doc_type: model.__elasticsearch__.document_type, + id: model.id + } + Resque.enqueue(Octo::Search::Indexer, :delete, opts) end end diff --git a/octo-core/lib/octocore/search/setup.rb b/octo-core/lib/octocore/search/setup.rb deleted file mode 100644 index f7f823a..0000000 --- a/octo-core/lib/octocore/search/setup.rb +++ /dev/null @@ -1,74 +0,0 @@ -module Octo - - module Search - - # Setup module for ElasticSearch - module Setup - - # Creates the necessary indices - class Create - - def self.perform - sclient = Octo::Search::Client.new - sconfig = Octo.get_config(:search) - - # Set the cluster disk space thresholds first. That's necessary - # because the defaults are too less for development machines. So, - # in order to keep it moving, we set a lower threshold in development. - # Refer - # https://www.elastic.co/guide/en/elasticsearch/reference/current/disk-allocator.html - if sconfig.has_key?(:disk_threshold_low) and sconfig.has_key?(:disk_threshold_high) - cluster_settings = { - body: { - persistent: { - 'cluster.routing.allocation.disk.threshold_enabled' => true, - 'cluster.routing.allocation.disk.watermark.low' => sconfig[:disk_threshold_low], - 'cluster.routing.allocation.disk.watermark.high' => sconfig[:disk_threshold_high], - 'cluster.info.update.interval' => '60s' - } - } - } - sclient.cluster.put_settings cluster_settings - end - - # Check if any indices specified exists. If not exists, create them - sconfig[:index].keys.each do |index_name| - args = { index: index_name } - if sclient.indices.exists?(args) - Octo.logger.info "Search Index: #{ index_name } exists." - else - Octo.logger.warn "Search Index: #{ index_name } DOES NOT EXIST." - Octo.logger.info "Creating Index: #{ index_name }" - create_args = { - index: index_name, - body: sconfig[:index][index_name] - } - sclient.indices.create create_args - end - end - - # Also check if there are any indices present that should not be - # present - _indices = JSON.parse(sclient.cluster.state)['metadata']['indices']. - keys.map(&:to_sym) - extra_indices = _indices - sconfig[:index].keys - Octo.logger.warn "Found extra indices: #{ extra_indices }" - end - end - - # Updates the indices. - # The major differene between this and the Create is that while create - # just checks for the existance by name, and passes if the name is found - # This actually overwrites all the mappings, properties, warmers etc - # So, this should be used only when we need to explicitly "UPDATE" the - # index. - class Update - - end - - - end - - end - -end