Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft of elasticsearch integration #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion octo-core/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PATH
cequel (~> 1.9, >= 1.9.0)
descriptive_statistics (~> 2.5.1, >= 2.5.0)
elasticsearch (~> 1.0.17, >= 1.0.17)
elasticsearch-model (~> 0.1.9, >= 0.1.9)
faraday (~> 0.9.2, >= 0.9.2)
hiredis (~> 0.6.1, >= 0.6.0)
hooks (~> 0.4.1, >= 0.4.1)
Expand Down Expand Up @@ -41,11 +42,16 @@ GEM
elasticsearch-transport (= 1.0.18)
elasticsearch-api (1.0.18)
multi_json
elasticsearch-model (0.1.9)
activesupport (> 3)
elasticsearch (> 0.4)
hashie
elasticsearch-transport (1.0.18)
faraday
multi_json
faraday (0.9.2)
multipart-post (>= 1.2, < 3)
hashie (3.4.4)
hiredis (0.6.1)
hooks (0.4.1)
uber (~> 0.0.14)
Expand Down Expand Up @@ -90,7 +96,7 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.4.0)
rspec-support (3.4.1)
ruby-kafka (0.3.10)
ruby-kafka (0.3.11)
rufus-scheduler (3.2.1)
sinatra (1.4.7)
rack (~> 1.5)
Expand Down
11 changes: 10 additions & 1 deletion octo-core/bin/clean_build.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
#!/bin/bash

# Remove any existing gem present
rm *.gem
gem build octocore.gemspec && gem uninstall octocore --force && gem install octocore-0.0.1.gem

# Build the gem
gem build octocore.gemspec && gem uninstall octocore --force


# Install it
find . -name "*.gem" | xargs gem install
2 changes: 1 addition & 1 deletion octo-core/lib/octocore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def self._connect(configuration)
require 'octocore/search'

self.logger.info('Setting callbacks.')

end

# Creates a logger for Octo
Expand All @@ -140,3 +139,4 @@ def self.logger
end

end

140 changes: 135 additions & 5 deletions octo-core/lib/octocore/models/user.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,151 @@
require 'cequel'
require 'octocore/search/searchable'

module Octo
class User
include Cequel::Record
include Octo::Search::Searchable

belongs_to :enterprise, class_name: 'Octo::Enterprise'

key :id, :bigint

column :last_login, :timestamp
column :curr_city, :text
column :curr_state, :text
column :curr_country, :text

column :home_location_lat, :float
column :home_location_lon, :float

column :work_location_lat, :float
column :work_location_lon, :float

timestamps

# Define the associations
has_many :user_location_histories
has_many :user_phone_details
has_many :push_token
has_many :user_browser_details
has_many :user_personas
has_many :user_location_histories, class_name: 'Octo::UserLocationHistory'
has_many :user_phone_details, class_name: 'Octo::UserPhoneDetails'
has_many :push_token, class_name: 'Octo::PushToken'
has_many :user_browser_details, class_name: 'Octo::UserBrowserDetails'
has_many :user_personas, class_name: 'Octo::UserPersona'

settings index: { number_of_shards: 1 } do
mappings dynamic: 'false' do
indexes :enterpriseid, analyzer: 'keyword', index_options: 'offsets'
indexes :id, type: :integer
indexes :last_login, type: :date
indexes :curr_state, type: :string
indexes :city, type: :string
indexes :state, type: :string
indexes :country, type: :string
indexes :os, type: :nested
indexes :browser, type: :nested
indexes :engagement, type: :integer
indexes :home_location, type: :geo_point
indexes :work_location, type: :geo_point
indexes :persona, type: :nested
indexes :time_slots, type: :nested
end
end


# Returns the data for indexing purposess.
# @return [Hash] The user object's fields and values represented as a hash
# for indexing purposes
#
def as_indexed_json(options = {})
i = Hash.new
i.merge!({
id: id,
userid: id,
enterpriseid: enterprise.id.to_s,
created: created_at,
updated: updated_at,
last_login: last_login,
device_id: device_ids,
manufacturer: device_manufacturers,
model: device_models
})
i[:city] = curr_city if curr_city
i[:country] = curr_country if curr_country
i[:state] = curr_state if curr_state
i[:os] = os if os
i[:browser] = browsers if browsers.count > 0
i[:engagement] = engagement if engagement
i[:home_location] = home_location if home_location
i[:work_location] = work_location if work_location
i[:persona] = user_personas if user_personas.count > 0
i[:time_slots] = time_slots if time_slots.count > 0
i
end

# Gets the list of device IDs for the user
# @return [Array<String>] Device IDs for the user
#
def device_ids
user_phone_details.collect { |x| x.deviceid }
end

# Gets the list of device manufacturers for the set of devices that user
# has
# @return [Array<String>] The array of device manufacturers
#
def device_manufacturers
user_phone_details.collect { |x| x.manufacturer }
end

# Gets the list of device models for the user
# @return [Array<String>] The array of device models for the user
#
def device_models
user_phone_details.collect { |x| x.model }
end

# Gets the browsers for the user
# @return [Array<String>] Array of browsers
#
def browsers
[]
end

# Gets the list of OSs for the user
# @return [Array<String>] List of OSs of the user
#
def os
[]
end

# Gets the engagement class of the user
# @return [Fixnum] The engagement class of user. Defaults to 1
#
def engagement
1
end

# Returns the home location of user as a geopoint data type
# @return [Hash] The home location of user
#
def home_location
if home_location_lat and home_location_lon
[home_location_lat, home_location_lon].as_geopoint
end
end

# Gets the work location of user as a geopoint data type
# @return [Hash] The work location of user
#
def work_location
if work_location_lat and work_location_lon
[work_location_lat, work_location_lon].as_geopoint
end
end

# Gets the time slots for which the user is active
# @return [Array<Range>] The time slots of user
def time_slots
[]
end

end
end
Expand Down
2 changes: 1 addition & 1 deletion octo-core/lib/octocore/schedeuleable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ def perform(*args)
end

end
end
end
32 changes: 31 additions & 1 deletion octo-core/lib/octocore/search.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
require 'elasticsearch/model'

require 'octocore/search/client'
require 'octocore/search/setup'
require 'octocore/search/indexer'
require 'octocore/search/searchable'

module Octo

module Search

class InitConnection

# Connects to the elasticsearch cluster and validates the presence
# of indexes for all the classes which have included
# `Octo::Search::Searchable`
#
def self.connect
if Octo.get_config(:search) and Octo.get_config(:search)[:server]
Elasticsearch::Model.client = Octo::Search::Client.new

klasses = ObjectSpace.each_object(Class).select do |c|
c.included_modules.include? Octo::Search::Searchable
end
klasses.each { |k| k.__elasticsearch__.create_index! }
end
end

end
end
end

# Perform connection to Elasticsearch when required
#
Octo::Search::InitConnection.connect

16 changes: 13 additions & 3 deletions octo-core/lib/octocore/search/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class Client

include Elasticsearch::API

CONNECTION = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server])

# Low level method for performing a request to Elastic Search cluster
# @param [String] method The method ex: get, put, post, etc..
# @param [String] path The path of the request
Expand All @@ -21,12 +19,24 @@ class Client
def perform_request(method, path, params, body)
Octo.logger.debug "--> #{method.upcase} #{path} #{params} #{body}"

CONNECTION.run_request \
response = connection.run_request \
method.downcase.to_sym,
path,
( body ? MultiJson.dump(body): nil ),
{'Content-Type' => 'application/json'}

Octo.logger.debug "#{ response.status }, #{ response.body }"
response
end

# Creates a new connection to the server if not already done
def connection
unless @connection
@connection = ::Faraday::Connection.new(url: Octo.get_config(:search)[:server])
end
@connection
end

end
end
end
Expand Down
46 changes: 46 additions & 0 deletions octo-core/lib/octocore/search/indexer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
require 'octocore/search/client'

module Octo
module Search

# Its a Resque worker that sends index calls
#
class Indexer

# The queue on which the job shall be performed
#
@queue = :indexer_queue

# Performs the indexing part
# @param [String] action The action to be performed. Namely `index` or
# `delete`
# @param [Hash] opts The data to be used while performing the action
# @option opts [String] idx_name The index name to be used
# @option opts [String] doc_type The document type of the document
# @option opts [String] id The ID of the document
# @option opts [Hash] body The body of the document to be indexed
#
def self.perform(action, opts={})
action = action.to_sym
if action == :index
self.client.index index: opts['idx_name'],
type: opts['doc_type'],
id: opts['body']['id'],
body: opts['body']
elsif action == :delete
self.client.delete index: opts['idx_name'],
type: opts['doc_type'],
id: opts['id']
end
end

# Gets the search client for the indexer.
# @return [Octo::Search::Client] A search client
#
def self.client
@client ||= Octo::Search::Client.new
end

end
end
end
47 changes: 37 additions & 10 deletions octo-core/lib/octocore/search/searchable.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
require 'elasticsearch/model'

module Octo
module Searchable
module Search
module Searchable

# Gets the search client
def searchclient
unless @searchClient
@searchClient = Octo::Search::Client.new
# Extend the class with callbacks for indexing the document or deleting
# it based on appropriate events
#
def self.included(base)
base.send(:include, ::Elasticsearch::Model)
[:save].each do |mtd|
base.send("after_#{ mtd }", lambda { async_index self })
end
base.after_destroy { async_delete self }
end
@searchClient
end

# Defines the indice with which this would be indexed
def indexable_with(indice_name, type)
# Perform an async indexing of the data
# @param [Object] model The model instance to be indexed
#
def async_index(model)
opts = {
idx_name: model.__elasticsearch__.index_name,
doc_type: model.__elasticsearch__.document_type,
body: model.as_indexed_json
}
Resque.enqueue(Octo::Search::Indexer, :index, opts)
end

end
# Perform async delete of the document from index.
# @param [Object] model The model instance to be indexed
#
def async_delete(model)
opts = {
idx_name: model.__elasticsearch__.index_name,
doc_type: model.__elasticsearch__.document_type,
id: model.id
}
Resque.enqueue(Octo::Search::Indexer, :delete, opts)
end

end
end
end

Loading