-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor: Move StopTimeUpdate consumer into a new pipeline #493
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some small comments before I hop on a plane
def source(opts) do | ||
Pipeline.source( | ||
:trip_updates_enhanced, | ||
opts[:trip_updates_url], | ||
Concentrate.Parser.GTFSRealtimeEnhanced | ||
) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this behave well if the trip_updates_url
isn't set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I was thinking that isn't something we have a need to worry about—is there something I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might cause problems to any non-MBTA users since they won't be able to give a trip_updates_url that has the format we expect, but wouldn't make a difference to us.
lib/concentrate/supervisor.ex
Outdated
Enum.concat([ | ||
Concentrate.Pipeline.VehiclePositionsPipeline.pipeline(opts), | ||
Concentrate.Pipeline.StopTimeUpdatesPipeline.pipeline(opts) | ||
]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of the stop times pipeline as being pretty independent from the concentrate pipeline. What do you think about giving them separate supervisors to reflect that, and so if one pipeline dies, the other one doesn't have to get restarted with it. (I'm imagining both pipeline-supervisors would be started by Realtime.Supervisor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added individual supervisors for each pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 They can be restarted separately, but I think the structure is more complex than necessary.
It's now
Realtime.Supervisor
├ ...
└ Concentrate.Supervisor
├ Concentrate.Pipeline.VehiclePositionsPipelineSupervisor
│ └ Concentrate.Pipeline.VehiclePositionsPipeline
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Merge
│ └ Concentrate.Consumer.VehiclePositions
└ Concentrate.Pipeline.StopTimeUpdatesPipelineSupervisor
└ Concentrate.Pipeline.StopTimeUpdatesPipeline
├ Concentrate.Producer.HTTP
└ Concentrate.Consumer.StopTimeUpdates
but we can knock out a couple layers like this:
Realtime.Supervisor
├ ...
├ Concentrate.VehiclePositionsPipeline
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Merge
│ └ Concentrate.Consumer.VehiclePositions
└ Concentrate.Pipeline.StopTimeUpdatesPipeline
├ Concentrate.Producer.HTTP
└ Concentrate.Consumer.StopTimeUpdates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was intentional to encapsulate all the pipeline configuration knowledge. It didn't seem like something the Realtime supervisor should be concerned with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realtime.Supervisor
wouldn't know anything about how the pipelines are set up. It would just know that there's 2 separate pipelines, one for VPs and one for STUs.
All the knowledge about which children to start and which env vars to use would be written in VehiclePositionsPipeline
or StopTimeUpdatesPipeline
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, knowing about what pipelines exist is what I wanted to encapsulate.
92a735b
to
24f18eb
Compare
Codecov Report
@@ Coverage Diff @@
## master #493 +/- ##
=========================================
+ Coverage 97.51% 97.7% +0.18%
=========================================
Files 166 169 +3
Lines 3747 3750 +3
Branches 516 516
=========================================
+ Hits 3654 3664 +10
+ Misses 90 83 -7
Partials 3 3
Continue to review full report at Codecov.
|
|
||
@impl true | ||
def init(opts) do | ||
Supervisor.init(Concentrate.Pipeline.StopTimeUpdatesPipeline.init(opts), strategy: :rest_for_one) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also something like you could include in Pipeline
, since you can pass a list of children to Supervisor.start_link
. The two separate pipelines would be configured like this:
{Concentrate.Pipeline, module: Concentrate.Pipeline.StopTimeUpdates, ..other_opts..},
{Concentrate.Pipeline, module: Concentrate.Pipeline.VehiclePositions, ..other opts..}
The init/1
callback for Pipeline
behaviors is the same, and then you implement start_link/1
and child_spec/1
on the Pipeline
module directly. This is also a case where you'll want an id
on the child spec: you can probably use the module for the ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I might not be fully grasping what you are saying here, perhaps starting with the bit about "you can pass a list of children to Supervisor.start_link". Are you suggesting something counter to the suggestion Sky made here of separating out the pipelines from a single supervisor? Or is what you are saying different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll write it up more thoroughly tomorrow morning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! We can also talk through it in person if that's easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defmodule Pipeline do
@moduledoc """Create a pipeline supervisor for a given module."""
@callback init(Keyword.t) :: [Supervisor.child_spec]
def start_link(mod, opts) do
children = mod.init(opts)
Supervisor.start_link(children, restart: :rest_for_one)
end
def child_spec(opts) do
mod = Keyword.fetch(opts, :module)
%{
type: :supervisor,
id: mod,
start: {__MODULE__, :start_link, [mod, opts]}
}
end
end
# in Concentrate.Supervisor
def start_link(...) do
Supervisor.start_link([
{Pipeline, module: StopTimePipeline, ...},
{Pipeline, module: VehiclePositionSupervisor, ...}
], restart: :one_for_one)
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's helpful. Let me know if you have any suggestions on my adaptation.
088ebab
to
e822509
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good!
] | ||
|
||
@impl Concentrate.Pipeline | ||
@spec init(opts()) :: [Supervisor.child_spec()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- You alias
Concentrate.Pipeline
but then keep usingConcentrate.Pipeline
in most places (in both modules). - You don't need the
@spec
here, as it's implicitly the same as the@callback
you're@impl
ing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good now, but I think we should wait for a ✅ from paul, too.
Supervisor.start_link(children, strategy: :rest_for_one) | ||
end | ||
|
||
def child_spec(opts) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need a @impl true
to say it implements the Supervisor.child_spec
callback? (Or would that get in the way of this being a behavior itself?)
(Same for Concentrate.Supervisor.child_spec
which doesn't currently have an @impl true
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe so because these aren't declaring a behaviour. I added a spec here though.
lib/realtime/supervisor.ex
Outdated
swiftly_realtime_vehicles_url: | ||
Application.get_env(:skate, :swiftly_realtime_vehicles_url), | ||
trip_updates_url: Application.get_env(:skate, :trip_updates_url) | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this extra nesting come up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, moved that to the child_spec where it should actually be.
@paulswartz does this PR look good to you at this point? |
Split this functionality out from Concentrate.Supervisor.
Separate this out from the VehiclePositionsPipeline since we don't need the merge step and could run on a different cycle.
Rename the root function to init. Rename the existing module to be PipelineHelpers.
74212aa
to
223078d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🍰
Asana ticket: Refactor: Move StopTimeUpdate consumer into a new pipeline