Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correlation ID as 1st class citizen #346

Closed
paneq opened this issue May 5, 2018 · 17 comments
Closed

Correlation ID as 1st class citizen #346

paneq opened this issue May 5, 2018 · 17 comments
Assignees
Milestone

Comments

@paneq
Copy link
Member

paneq commented May 5, 2018

Imagine if dispatcher wrapped calling event handlers in

def call(subscriber, event)
  res.with_metadata(correlation_id: event.metadata[:correlation_id] || event.event_id) do
    subscriber = subscriber.new if Class === subscriber
    subscriber.call(event)
  end
end

That would immediately help in debugging and propagating correlation between initial event and events triggered by it. If the initial event gets correlation_id copied from command then we also know which command/request triggered the whole tree of events.

Question: How to handle async event handlers.

Related:

@paneq
Copy link
Member Author

paneq commented May 7, 2018

The are both really simple patterns I have never quite understood why 
they end up so misunderstood. 

Let's say every message has 3 ids. 1 is its id. Another is correlation 
the last it causation. 

The rules are quite simple. If you are responding to a message, you 
copy its correlation id as your correlation id, its message id is your 
causation id. 

This allows you to see an entire conversation (correlation id) or to 
see what causes what (causation id). 

https://groups.google.com/forum/#!topic/event-store/pvk2iEBgBtA

@paneq
Copy link
Member Author

paneq commented May 9, 2018

Just as ruby event store automatically fills out the timestimep, it should also automatically fill out causation_id and correlation_id from event.id if they are blank/not provided. That way the 1st event in chain is going to have those 3 ids filled out instead of having ID only.

@paneq
Copy link
Member Author

paneq commented May 9, 2018

Also it would nice to have a wrapper for command_bus which would read causation_id and correlation_id and copy them to the command as well. That would make possible to track event->command->event flow as compared to event->event

@paneq
Copy link
Member Author

paneq commented May 9, 2018

event.correlate_with(another_event_or_command) could be a nice low-level API to copy those values from another_event_or_command to event's metadata.

  • RubyEventStore::Event
  • RubyEventStore::Proto

@joelvh
Copy link
Contributor

joelvh commented May 15, 2018

Should correlation ID and causation ID be individual columns on SQL store? If first class citizens, this would at least make storage and retrieval more efficient. I think of metadata as storage of data for when an individual event is read, whereas I can see querying on the other IDs to get a subset of events.

@paneq
Copy link
Member Author

paneq commented May 15, 2018

@joelvh my plan is to link them to streams https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/

@joelvh
Copy link
Contributor

joelvh commented May 16, 2018

Fair enough. I think having some of this behavior as options you can toggle on or off would be ideal, such as automatically linking causation and correlation streams.

@paneq
Copy link
Member Author

paneq commented May 22, 2018

@joelvh yep, exactly. I was thinking about an event handler that you can subscribe manually.

paneq added a commit that referenced this issue May 22, 2018
https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/

> Let’s say every message has 3 ids. 1 is its id. Another is correlation the last
> it causation. If you are responding to a message, you copy its correlation id
> as your correlation id, its message id is your causation id. This allows you to
> see an entire conversation (correlation id) or to see what causes what (causation id).

Not all tests passing, but let's verify the idea and fix later.

Issue: #346

[ci skip]
@paneq
Copy link
Member Author

paneq commented May 22, 2018

@joelvh
Copy link
Contributor

joelvh commented May 22, 2018

I still prefer the individual fields/columns for the correlation ID and causation ID so linking to streams isn’t needed and metadata doesn’t have those fields. However, in keeping to the existing schema and utilizing existing features, I think #362 looks good to me.

@paneq
Copy link
Member Author

paneq commented May 22, 2018

I still prefer the individual fields/columns for the correlation ID and causation ID

#362 this is 2 layers above this issues. You can still put them into individual columns if you want to.

paneq added a commit that referenced this issue May 28, 2018
@paneq
Copy link
Member Author

paneq commented May 28, 2018

  • event.correlate_with(event) - documentation
  • command ID's API
  • event.correlate_with(command)

@paneq
Copy link
Member Author

paneq commented May 28, 2018

Related #364 - command.message_id & event.message_id

paneq added a commit that referenced this issue May 29, 2018
paneq added a commit that referenced this issue May 29, 2018
* message_id as general Message interface compatible
  with events and commands.

Issue: #346 #364
paneq added a commit that referenced this issue May 29, 2018
@paneq
Copy link
Member Author

paneq commented May 29, 2018

  • Dispatcher wrapped calling event handlers in correlation block

@paneq paneq added this to the v0.31 milestone May 29, 2018
@paneq paneq self-assigned this May 29, 2018
paneq added a commit that referenced this issue Jun 15, 2018
paneq added a commit that referenced this issue Jun 15, 2018
@paneq
Copy link
Member Author

paneq commented Jun 15, 2018

  • commands correlated with events which trigger them
  • async handlers

@paneq
Copy link
Member Author

paneq commented Jun 15, 2018

PoC:

module RubyEventStore
  class CorrelatedCommands

    def initialize(event_store, command_bus)
      @event_store = event_store
      @command_bus = command_bus
    end

    class MiniEvent < Struct.new(:correlation_id, :message_id)
    end

    def call(command)
      if event_store.metadata[:correlation_id] && event_store.metadata[:causation_id]
        command.correlate_with(MiniEvent.new(
          event_store.metadata[:correlation_id],
          event_store.metadata[:causation_id],
        ))
        event_store.with_metadata(
          correlation_id: event_store.metadata[:correlation_id],
          causation_id: command.message_id,
        ) do
          @command_bus.call(command)
        end
      else
        event_store.with_metadata(
          correlation_id: command.message_id,
          causation_id: command.message_id,
        ) do
          @command_bus.call(command)
        end
      end
    end

  end
end

paneq added a commit that referenced this issue Jun 19, 2018
* commands are correlated with events which triggered them inside sync
  handlers
* events are correlated with commands which trigger them via command_bus

Async handlers not yet approached.

Issue: #346
@paneq
Copy link
Member Author

paneq commented Jun 19, 2018

  • async handlers helpers documentation
  • Handlers persisting to certain streams based on metadata

paneq added a commit that referenced this issue Jun 22, 2018
which help with correlating commands/events coming from async handlers.

Issue: #346
paneq added a commit that referenced this issue Jun 25, 2018
in Rails 4.2.*

It doesn't really test much (which was revealed by mutation testing and
thus changed to :async) because it executes the job in the same thread
and thus getting the metadata that we prepared for sync-handlers. So not
really testing what we have in mind.

We should include additional integration tests which would verify with
a few other adapters for safety.

Issue: #346
paneq added a commit that referenced this issue Jun 25, 2018
to be double sure that our serialization for async handlers works
properly.

Issue: #346

----- ActiveJob::Base.queue_adapter global -----

Unfortunately in rails 4.2 Active job queue adapter was a class variable
shared between all classes:

module ActiveJob
  module QueueAdapter #:nodoc:
    extend ActiveSupport::Concern
    module ClassMethods
      mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter }
      def queue_adapter=(name_or_adapter)
        @@queue_adapter = \
          case name_or_adapter
          when :test
            ActiveJob::QueueAdapters::TestAdapter.new
          when Symbol, String
            load_adapter(name_or_adapter)
          else
            name_or_adapter if name_or_adapter.respond_to?(:enqueue)
          end
      end

as a result writing:

class MySidekiqHandler < ActiveJob::Base
  self.queue_adapter = :sidekiq

affected all tests :( and set the queue adapter to sidekiq for all
handlers and not this one particular only.

Rails 5.2 uses class_attribute from ActiveSupport:
* Declare a class-level attribute whose value is inheritable by subclasses.
* Subclasses can change their own value and it will not impact parent class.

but we need our tests to work on both versions right now.

So I decided to operate on ActiveJob::Base.queue_adapter in all test to
unify the behavior between all versions that we support.

----- Sidekiq testing -----

I did not plan originally to use Sidekiq::Testing as I wanted be as close
to real scenario as possible. But I could not find an easy way to run a
real worker that would get the jobs from redis and process them and then
quit.

This seems to be good enough as we go through the serialization process:

  class Client
    alias_method :raw_push_real, :raw_push

    def raw_push(payloads)
      if Sidekiq::Testing.fake?
        payloads.each do |job|
          job = Sidekiq.load_json(Sidekiq.dump_json(job))
          job.merge!('enqueued_at' => Time.now.to_f) unless job['at']
          Queues.push(job['queue'], job['class'], job)
        end
        true
      elsif Sidekiq::Testing.inline?
        payloads.each do |job|
          klass = Sidekiq::Testing.constantize(job['class'])
          job['id'] ||= SecureRandom.hex(12)
          job_hash = Sidekiq.load_json(Sidekiq.dump_json(job))
          klass.process_job(job_hash)
        end
        true
      else
        raw_push_real(payloads)
      end
    end
  end

I run Sidekiq::Worker.drain_all in a new thread to decouple from
thread-level global variables.

https://github.com/mperham/sidekiq/wiki/Testing

----- STDOUT -----

Sidekiq is unhappy about Rails.env when we require it:

if defined?(::Rails) && Rails.respond_to?(:env) && !Rails.env.test?
  puts("⛔️ WARNING: Sidekiq testing API enabled...")

I reassign $stdout to mute this warning which is not of value for us.
paneq added a commit that referenced this issue Jun 26, 2018
paneq added a commit that referenced this issue Jun 27, 2018
paneq added a commit that referenced this issue Jun 28, 2018
but defaults to configuration described in docs.

* aliased CorrelatedCommands to RubyEventStore::CorrelatedCommands

Issue: #221 #346
paneq added a commit that referenced this issue Jun 28, 2018
paneq added a commit that referenced this issue Jun 29, 2018
@paneq paneq closed this as completed Jun 29, 2018
paneq added a commit that referenced this issue Jul 1, 2018
paneq added a commit that referenced this issue Jul 2, 2018
paneq added a commit that referenced this issue Jul 2, 2018
@paneq paneq mentioned this issue Sep 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants