-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Correlation ID as 1st class citizen #346
Comments
https://groups.google.com/forum/#!topic/event-store/pvk2iEBgBtA |
Just as ruby event store automatically fills out the timestimep, it should also automatically fill out |
Also it would nice to have a wrapper for |
|
Should correlation ID and causation ID be individual columns on SQL store? If first class citizens, this would at least make storage and retrieval more efficient. I think of metadata as storage of data for when an individual event is read, whereas I can see querying on the other IDs to get a subset of events. |
@joelvh my plan is to link them to streams https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/ |
Fair enough. I think having some of this behavior as options you can toggle on or off would be ideal, such as automatically linking causation and correlation streams. |
@joelvh yep, exactly. I was thinking about an event handler that you can subscribe manually. |
https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/ > Let’s say every message has 3 ids. 1 is its id. Another is correlation the last > it causation. If you are responding to a message, you copy its correlation id > as your correlation id, its message id is your causation id. This allows you to > see an entire conversation (correlation id) or to see what causes what (causation id). Not all tests passing, but let's verify the idea and fix later. Issue: #346 [ci skip]
Please send feedback for https://github.com/RailsEventStore/rails_event_store/pull/362/files |
I still prefer the individual fields/columns for the correlation ID and causation ID so linking to streams isn’t needed and metadata doesn’t have those fields. However, in keeping to the existing schema and utilizing existing features, I think #362 looks good to me. |
#362 this is 2 layers above this issues. You can still put them into individual columns if you want to. |
|
Related #364 - |
|
|
PoC: module RubyEventStore
class CorrelatedCommands
def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
end
class MiniEvent < Struct.new(:correlation_id, :message_id)
end
def call(command)
if event_store.metadata[:correlation_id] && event_store.metadata[:causation_id]
command.correlate_with(MiniEvent.new(
event_store.metadata[:correlation_id],
event_store.metadata[:causation_id],
))
event_store.with_metadata(
correlation_id: event_store.metadata[:correlation_id],
causation_id: command.message_id,
) do
@command_bus.call(command)
end
else
event_store.with_metadata(
correlation_id: command.message_id,
causation_id: command.message_id,
) do
@command_bus.call(command)
end
end
end
end
end |
* commands are correlated with events which triggered them inside sync handlers * events are correlated with commands which trigger them via command_bus Async handlers not yet approached. Issue: #346
|
which help with correlating commands/events coming from async handlers. Issue: #346
in Rails 4.2.* It doesn't really test much (which was revealed by mutation testing and thus changed to :async) because it executes the job in the same thread and thus getting the metadata that we prepared for sync-handlers. So not really testing what we have in mind. We should include additional integration tests which would verify with a few other adapters for safety. Issue: #346
to be double sure that our serialization for async handlers works properly. Issue: #346 ----- ActiveJob::Base.queue_adapter global ----- Unfortunately in rails 4.2 Active job queue adapter was a class variable shared between all classes: module ActiveJob module QueueAdapter #:nodoc: extend ActiveSupport::Concern module ClassMethods mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } def queue_adapter=(name_or_adapter) @@queue_adapter = \ case name_or_adapter when :test ActiveJob::QueueAdapters::TestAdapter.new when Symbol, String load_adapter(name_or_adapter) else name_or_adapter if name_or_adapter.respond_to?(:enqueue) end end as a result writing: class MySidekiqHandler < ActiveJob::Base self.queue_adapter = :sidekiq affected all tests :( and set the queue adapter to sidekiq for all handlers and not this one particular only. Rails 5.2 uses class_attribute from ActiveSupport: * Declare a class-level attribute whose value is inheritable by subclasses. * Subclasses can change their own value and it will not impact parent class. but we need our tests to work on both versions right now. So I decided to operate on ActiveJob::Base.queue_adapter in all test to unify the behavior between all versions that we support. ----- Sidekiq testing ----- I did not plan originally to use Sidekiq::Testing as I wanted be as close to real scenario as possible. But I could not find an easy way to run a real worker that would get the jobs from redis and process them and then quit. This seems to be good enough as we go through the serialization process: class Client alias_method :raw_push_real, :raw_push def raw_push(payloads) if Sidekiq::Testing.fake? payloads.each do |job| job = Sidekiq.load_json(Sidekiq.dump_json(job)) job.merge!('enqueued_at' => Time.now.to_f) unless job['at'] Queues.push(job['queue'], job['class'], job) end true elsif Sidekiq::Testing.inline? payloads.each do |job| klass = Sidekiq::Testing.constantize(job['class']) job['id'] ||= SecureRandom.hex(12) job_hash = Sidekiq.load_json(Sidekiq.dump_json(job)) klass.process_job(job_hash) end true else raw_push_real(payloads) end end end I run Sidekiq::Worker.drain_all in a new thread to decouple from thread-level global variables. https://github.com/mperham/sidekiq/wiki/Testing ----- STDOUT ----- Sidekiq is unhappy about Rails.env when we require it: if defined?(::Rails) && Rails.respond_to?(:env) && !Rails.env.test? puts("⛔️ WARNING: Sidekiq testing API enabled...") I reassign $stdout to mute this warning which is not of value for us.
Imagine if dispatcher wrapped calling event handlers in
That would immediately help in debugging and propagating correlation between initial event and events triggered by it. If the initial event gets correlation_id copied from command then we also know which command/request triggered the whole tree of events.
Question: How to handle async event handlers.
Related:
with_metadata
#327The text was updated successfully, but these errors were encountered: