diff --git a/octo-core/Gemfile.lock b/octo-core/Gemfile.lock index 05aaf57..9149e0c 100644 --- a/octo-core/Gemfile.lock +++ b/octo-core/Gemfile.lock @@ -5,6 +5,7 @@ PATH cequel (~> 1.9, >= 1.9.0) descriptive_statistics (~> 2.5.1, >= 2.5.0) elasticsearch (~> 1.0.17, >= 1.0.17) + elasticsearch-model (~> 0.1.9, >= 0.1.9) faraday (~> 0.9.2, >= 0.9.2) hiredis (~> 0.6.1, >= 0.6.0) hooks (~> 0.4.1, >= 0.4.1) @@ -41,11 +42,16 @@ GEM elasticsearch-transport (= 1.0.18) elasticsearch-api (1.0.18) multi_json + elasticsearch-model (0.1.9) + activesupport (> 3) + elasticsearch (> 0.4) + hashie elasticsearch-transport (1.0.18) faraday multi_json faraday (0.9.2) multipart-post (>= 1.2, < 3) + hashie (3.4.4) hiredis (0.6.1) hooks (0.4.1) uber (~> 0.0.14) @@ -90,7 +96,7 @@ GEM diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.4.0) rspec-support (3.4.1) - ruby-kafka (0.3.10) + ruby-kafka (0.3.11) rufus-scheduler (3.2.1) sinatra (1.4.7) rack (~> 1.5) diff --git a/octo-core/bin/clean_build.sh b/octo-core/bin/clean_build.sh index 42e6435..99258ec 100755 --- a/octo-core/bin/clean_build.sh +++ b/octo-core/bin/clean_build.sh @@ -1,2 +1,11 @@ +#!/bin/bash + +# Remove any existing gem present rm *.gem -gem build octocore.gemspec && gem uninstall octocore --force && gem install octocore-0.0.1.gem + +# Build the gem +gem build octocore.gemspec && gem uninstall octocore --force + + +# Install it +find . -name "*.gem" | xargs gem install diff --git a/octo-core/lib/octocore.rb b/octo-core/lib/octocore.rb index df6024e..2eb4dce 100644 --- a/octo-core/lib/octocore.rb +++ b/octo-core/lib/octocore.rb @@ -126,7 +126,6 @@ def self._connect(configuration) require 'octocore/search' self.logger.info('Setting callbacks.') - end # Creates a logger for Octo @@ -140,3 +139,4 @@ def self.logger end end + diff --git a/octo-core/lib/octocore/models/user.rb b/octo-core/lib/octocore/models/user.rb index 79aa6d1..fbfe8d8 100644 --- a/octo-core/lib/octocore/models/user.rb +++ b/octo-core/lib/octocore/models/user.rb @@ -1,21 +1,151 @@ require 'cequel' +require 'octocore/search/searchable' module Octo class User include Cequel::Record + include Octo::Search::Searchable belongs_to :enterprise, class_name: 'Octo::Enterprise' key :id, :bigint + column :last_login, :timestamp + column :curr_city, :text + column :curr_state, :text + column :curr_country, :text + + column :home_location_lat, :float + column :home_location_lon, :float + + column :work_location_lat, :float + column :work_location_lon, :float + timestamps # Define the associations - has_many :user_location_histories - has_many :user_phone_details - has_many :push_token - has_many :user_browser_details - has_many :user_personas + has_many :user_location_histories, class_name: 'Octo::UserLocationHistory' + has_many :user_phone_details, class_name: 'Octo::UserPhoneDetails' + has_many :push_token, class_name: 'Octo::PushToken' + has_many :user_browser_details, class_name: 'Octo::UserBrowserDetails' + has_many :user_personas, class_name: 'Octo::UserPersona' + + settings index: { number_of_shards: 1 } do + mappings dynamic: 'false' do + indexes :enterpriseid, analyzer: 'keyword', index_options: 'offsets' + indexes :id, type: :integer + indexes :last_login, type: :date + indexes :curr_state, type: :string + indexes :city, type: :string + indexes :state, type: :string + indexes :country, type: :string + indexes :os, type: :nested + indexes :browser, type: :nested + indexes :engagement, type: :integer + indexes :home_location, type: :geo_point + indexes :work_location, type: :geo_point + indexes :persona, type: :nested + indexes :time_slots, type: :nested + end + end + + + # Returns the data for indexing purposess. + # @return [Hash] The user object's fields and values represented as a hash + # for indexing purposes + # + def as_indexed_json(options = {}) + i = Hash.new + i.merge!({ + id: id, + userid: id, + enterpriseid: enterprise.id.to_s, + created: created_at, + updated: updated_at, + last_login: last_login, + device_id: device_ids, + manufacturer: device_manufacturers, + model: device_models + }) + i[:city] = curr_city if curr_city + i[:country] = curr_country if curr_country + i[:state] = curr_state if curr_state + i[:os] = os if os + i[:browser] = browsers if browsers.count > 0 + i[:engagement] = engagement if engagement + i[:home_location] = home_location if home_location + i[:work_location] = work_location if work_location + i[:persona] = user_personas if user_personas.count > 0 + i[:time_slots] = time_slots if time_slots.count > 0 + i + end + + # Gets the list of device IDs for the user + # @return [Array] Device IDs for the user + # + def device_ids + user_phone_details.collect { |x| x.deviceid } + end + + # Gets the list of device manufacturers for the set of devices that user + # has + # @return [Array] The array of device manufacturers + # + def device_manufacturers + user_phone_details.collect { |x| x.manufacturer } + end + + # Gets the list of device models for the user + # @return [Array] The array of device models for the user + # + def device_models + user_phone_details.collect { |x| x.model } + end + + # Gets the browsers for the user + # @return [Array] Array of browsers + # + def browsers + [] + end + + # Gets the list of OSs for the user + # @return [Array] List of OSs of the user + # + def os + [] + end + + # Gets the engagement class of the user + # @return [Fixnum] The engagement class of user. Defaults to 1 + # + def engagement + 1 + end + + # Returns the home location of user as a geopoint data type + # @return [Hash] The home location of user + # + def home_location + if home_location_lat and home_location_lon + [home_location_lat, home_location_lon].as_geopoint + end + end + + # Gets the work location of user as a geopoint data type + # @return [Hash] The work location of user + # + def work_location + if work_location_lat and work_location_lon + [work_location_lat, work_location_lon].as_geopoint + end + end + + # Gets the time slots for which the user is active + # @return [Array] The time slots of user + def time_slots + [] + end end end diff --git a/octo-core/lib/octocore/schedeuleable.rb b/octo-core/lib/octocore/schedeuleable.rb index 931e1f4..6e2d52b 100644 --- a/octo-core/lib/octocore/schedeuleable.rb +++ b/octo-core/lib/octocore/schedeuleable.rb @@ -17,4 +17,4 @@ def perform(*args) end end -end \ No newline at end of file +end diff --git a/octo-core/lib/octocore/search.rb b/octo-core/lib/octocore/search.rb index 2ff8999..b77e3ac 100644 --- a/octo-core/lib/octocore/search.rb +++ b/octo-core/lib/octocore/search.rb @@ -1,5 +1,35 @@ +require 'elasticsearch/model' + require 'octocore/search/client' -require 'octocore/search/setup' require 'octocore/search/indexer' require 'octocore/search/searchable' +module Octo + + module Search + + class InitConnection + + # Connects to the elasticsearch cluster and validates the presence + # of indexes for all the classes which have included + # `Octo::Search::Searchable` + # + def self.connect + if Octo.get_config(:search) and Octo.get_config(:search)[:server] + Elasticsearch::Model.client = Octo::Search::Client.new + + klasses = ObjectSpace.each_object(Class).select do |c| + c.included_modules.include? Octo::Search::Searchable + end + klasses.each { |k| k.__elasticsearch__.create_index! } + end + end + + end + end +end + +# Perform connection to Elasticsearch when required +# +Octo::Search::InitConnection.connect + diff --git a/octo-core/lib/octocore/search/client.rb b/octo-core/lib/octocore/search/client.rb index 6bb6737..0fcf454 100644 --- a/octo-core/lib/octocore/search/client.rb +++ b/octo-core/lib/octocore/search/client.rb @@ -11,8 +11,6 @@ class Client include Elasticsearch::API - CONNECTION = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server]) - # Low level method for performing a request to Elastic Search cluster # @param [String] method The method ex: get, put, post, etc.. # @param [String] path The path of the request @@ -21,12 +19,24 @@ class Client def perform_request(method, path, params, body) Octo.logger.debug "--> #{method.upcase} #{path} #{params} #{body}" - CONNECTION.run_request \ + response = connection.run_request \ method.downcase.to_sym, path, ( body ? MultiJson.dump(body): nil ), {'Content-Type' => 'application/json'} + + Octo.logger.debug "#{ response.status }, #{ response.body }" + response end + + # Creates a new connection to the server if not already done + def connection + unless @connection + @connection = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server]) + end + @connection + end + end end end diff --git a/octo-core/lib/octocore/search/indexer.rb b/octo-core/lib/octocore/search/indexer.rb index e69de29..14ac190 100644 --- a/octo-core/lib/octocore/search/indexer.rb +++ b/octo-core/lib/octocore/search/indexer.rb @@ -0,0 +1,46 @@ +require 'octocore/search/client' + +module Octo + module Search + + # Its a Resque worker that sends index calls + # + class Indexer + + # The queue on which the job shall be performed + # + @queue = :indexer_queue + + # Performs the indexing part + # @param [String] action The action to be performed. Namely `index` or + # `delete` + # @param [Hash] opts The data to be used while performing the action + # @option opts [String] idx_name The index name to be used + # @option opts [String] doc_type The document type of the document + # @option opts [String] id The ID of the document + # @option opts [Hash] body The body of the document to be indexed + # + def self.perform(action, opts={}) + action = action.to_sym + if action == :index + self.client.index index: opts['idx_name'], + type: opts['doc_type'], + id: opts['body']['id'], + body: opts['body'] + elsif action == :delete + self.client.delete index: opts['idx_name'], + type: opts['doc_type'], + id: opts['id'] + end + end + + # Gets the search client for the indexer. + # @return [Octo::Search::Client] A search client + # + def self.client + @client ||= Octo::Search::Client.new + end + + end + end +end diff --git a/octo-core/lib/octocore/search/searchable.rb b/octo-core/lib/octocore/search/searchable.rb index 3bc25df..0157169 100644 --- a/octo-core/lib/octocore/search/searchable.rb +++ b/octo-core/lib/octocore/search/searchable.rb @@ -1,18 +1,45 @@ +require 'elasticsearch/model' + module Octo - module Searchable + module Search + module Searchable - # Gets the search client - def searchclient - unless @searchClient - @searchClient = Octo::Search::Client.new + # Extend the class with callbacks for indexing the document or deleting + # it based on appropriate events + # + def self.included(base) + base.send(:include, ::Elasticsearch::Model) + [:save].each do |mtd| + base.send("after_#{ mtd }", lambda { async_index self }) + end + base.after_destroy { async_delete self } end - @searchClient - end - # Defines the indice with which this would be indexed - def indexable_with(indice_name, type) + # Perform an async indexing of the data + # @param [Object] model The model instance to be indexed + # + def async_index(model) + opts = { + idx_name: model.__elasticsearch__.index_name, + doc_type: model.__elasticsearch__.document_type, + body: model.as_indexed_json + } + Resque.enqueue(Octo::Search::Indexer, :index, opts) + end - end + # Perform async delete of the document from index. + # @param [Object] model The model instance to be indexed + # + def async_delete(model) + opts = { + idx_name: model.__elasticsearch__.index_name, + doc_type: model.__elasticsearch__.document_type, + id: model.id + } + Resque.enqueue(Octo::Search::Indexer, :delete, opts) + end + end end end + diff --git a/octo-core/lib/octocore/search/setup.rb b/octo-core/lib/octocore/search/setup.rb deleted file mode 100644 index 2a47298..0000000 --- a/octo-core/lib/octocore/search/setup.rb +++ /dev/null @@ -1,71 +0,0 @@ -module Octo - - # Setup module for ElasticSearch - module Setup - - # Creates the necessary indices - class Create - - def self.perform - sclient = Octo::Search::Client.new - sconfig = Octo.get_config(:search) - - # Set the cluster disk space thresholds first. That's necessary - # because the defaults are too less for development machines. So, - # in order to keep it moving, we set a lower threshold in development. - # Refer - # https://www.elastic.co/guide/en/elasticsearch/reference/current/disk-allocator.html - if sconfig.has_key?(:disk_threshold_low) and sconfig.has_key?(:disk_threshold_high) - cluster_settings = { - body: { - persistent: { - 'cluster.routing.allocation.disk.threshold_enabled' => true, - 'cluster.routing.allocation.disk.watermark.low' => sconfig[:disk_threshold_low], - 'cluster.routing.allocation.disk.watermark.high' => sconfig[:disk_threshold_high], - 'cluster.info.update.interval' => '60s' - } - } - } - sclient.cluster.put_settings cluster_settings - end - - # Check if any indices specified exists. If not exists, create them - sconfig[:index].keys.each do |index_name| - args = { index: index_name } - if sclient.indices.exists?(args) - Octo.logger.info "Search Index: #{ index_name } exists." - else - Octo.logger.warn "Search Index: #{ index_name } DOES NOT EXIST." - Octo.logger.info "Creating Index: #{ index_name }" - create_args = { - index: index_name, - body: sconfig[:index][index_name] - } - sclient.indices.create create_args - end - end - - # Also check if there are any indices present that should not be - # present - _indices = JSON.parse(sclient.cluster.state)['metadata']['indices']. - keys.map(&:to_sym) - extra_indices = _indices - sconfig[:index].keys - Octo.logger.warn "Found extra indices: #{ extra_indices }" - end - end - - # Updates the indices. - # The major differene between this and the Create is that while create - # just checks for the existance by name, and passes if the name is found - # This actually overwrites all the mappings, properties, warmers etc - # So, this should be used only when we need to explicitly "UPDATE" the - # index. - class Update - - end - - - end - - -end diff --git a/octo-core/lib/octocore/utils.rb b/octo-core/lib/octocore/utils.rb index c6b2102..b21a74a 100644 --- a/octo-core/lib/octocore/utils.rb +++ b/octo-core/lib/octocore/utils.rb @@ -83,8 +83,29 @@ def to_slug end class ::Hash + + # Deep merge a hash with another hash def deep_merge(second) merger = proc { |key, v1, v2| Hash === v1 && Hash === v2 ? v1.merge(v2, &merger) : Array === v1 && Array === v2 ? v1 | v2 : [:undefined, nil, :nil].include?(v2) ? v1 : v2 } self.merge(second.to_h, &merger) end end + + +class ::Array + + # Returns an array of float as a geopoint + # @return [Hash] A hash with keys as `lat` and `lon` with + # their corresponding values. This is the convenient + # wrapper for elasticsearch data type + def as_geopoint + if self.count == 2 + { + lat: self[0].to_f, + lon: self[1].to_f + } + else + raise ArgumentError, 'Cannot convert as geopoint' + end + end +end diff --git a/octo-core/octocore.gemspec b/octo-core/octocore.gemspec index 9d9254e..3f1e238 100644 --- a/octo-core/octocore.gemspec +++ b/octo-core/octocore.gemspec @@ -37,6 +37,7 @@ DESC s.add_runtime_dependency 'ruby-kafka', '~> 0.3.2', '>= 0.3.2' s.add_runtime_dependency 'elasticsearch', '~> 1.0.17', '>= 1.0.17' s.add_runtime_dependency 'faraday', '~> 0.9.2', '>= 0.9.2' + s.add_runtime_dependency 'elasticsearch-model', '~> 0.1.9', '>= 0.1.9' s.add_development_dependency 'rspec', '~> 3.4.0', '>= 3.4.0' s.add_development_dependency 'parallel_tests', '~> 2.5.0', '>= 2.5.0' diff --git a/octo-notification/octonotification.gemspec b/octo-notification/octonotification.gemspec index 03762f6..8d1446a 100644 --- a/octo-notification/octonotification.gemspec +++ b/octo-notification/octonotification.gemspec @@ -26,4 +26,4 @@ DESC s.add_runtime_dependency 'gcm', '~> 0.1.1', '>= 0.1.0' s.add_runtime_dependency 'apns', '~> 1.0.0', '>= 1.0.0' s.add_runtime_dependency 'aws-sdk', '~> 2.2.35', '>= 2.2.35' -end \ No newline at end of file +end