Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Move StopTimeUpdate consumer into a new pipeline #493

Merged
merged 11 commits into from
Mar 11, 2020

Conversation

arkadyan
Copy link
Contributor

@arkadyan arkadyan commented Mar 5, 2020

Copy link
Member

@paulswartz paulswartz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some small comments before I hop on a plane

lib/concentrate/pipeline.ex Outdated Show resolved Hide resolved
Comment on lines +13 to +20
def source(opts) do
Pipeline.source(
:trip_updates_enhanced,
opts[:trip_updates_url],
Concentrate.Parser.GTFSRealtimeEnhanced
)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this behave well if the trip_updates_url isn't set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I was thinking that isn't something we have a need to worry about—is there something I'm missing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might cause problems to any non-MBTA users since they won't be able to give a trip_updates_url that has the format we expect, but wouldn't make a difference to us.

Comment on lines 23 to 26
Enum.concat([
Concentrate.Pipeline.VehiclePositionsPipeline.pipeline(opts),
Concentrate.Pipeline.StopTimeUpdatesPipeline.pipeline(opts)
])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of the stop times pipeline as being pretty independent from the concentrate pipeline. What do you think about giving them separate supervisors to reflect that, and so if one pipeline dies, the other one doesn't have to get restarted with it. (I'm imagining both pipeline-supervisors would be started by Realtime.Supervisor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added individual supervisors for each pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 They can be restarted separately, but I think the structure is more complex than necessary.
It's now

Realtime.Supervisor
├ ...
└ Concentrate.Supervisor
  ├ Concentrate.Pipeline.VehiclePositionsPipelineSupervisor
  │ └ Concentrate.Pipeline.VehiclePositionsPipeline
  │   ├ Concentrate.Producer.HTTP
  │   ├ Concentrate.Producer.HTTP
  │   ├ Concentrate.Merge
  │   └ Concentrate.Consumer.VehiclePositions
  └ Concentrate.Pipeline.StopTimeUpdatesPipelineSupervisor
    └ Concentrate.Pipeline.StopTimeUpdatesPipeline
      ├ Concentrate.Producer.HTTP
      └ Concentrate.Consumer.StopTimeUpdates

but we can knock out a couple layers like this:

Realtime.Supervisor
├ ...
├ Concentrate.VehiclePositionsPipeline
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Producer.HTTP
│ ├ Concentrate.Merge
│ └ Concentrate.Consumer.VehiclePositions
└ Concentrate.Pipeline.StopTimeUpdatesPipeline
  ├ Concentrate.Producer.HTTP
  └ Concentrate.Consumer.StopTimeUpdates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was intentional to encapsulate all the pipeline configuration knowledge. It didn't seem like something the Realtime supervisor should be concerned with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realtime.Supervisor wouldn't know anything about how the pipelines are set up. It would just know that there's 2 separate pipelines, one for VPs and one for STUs.

All the knowledge about which children to start and which env vars to use would be written in VehiclePositionsPipeline or StopTimeUpdatesPipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, knowing about what pipelines exist is what I wanted to encapsulate.

@codecov
Copy link

codecov bot commented Mar 9, 2020

Codecov Report

Merging #493 into master will increase coverage by 0.18%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #493      +/-   ##
=========================================
+ Coverage   97.51%   97.7%   +0.18%     
=========================================
  Files         166     169       +3     
  Lines        3747    3750       +3     
  Branches      516     516              
=========================================
+ Hits         3654    3664      +10     
+ Misses         90      83       -7     
  Partials        3       3
Impacted Files Coverage Δ
lib/concentrate/consumer/stop_time_updates.ex 100% <100%> (ø) ⬆️
...concentrate/pipeline/stop_time_updates_pipeline.ex 100% <100%> (ø)
lib/concentrate/pipeline.ex 100% <100%> (ø)
...concentrate/pipeline/vehicle_positions_pipeline.ex 100% <100%> (ø)
lib/concentrate/supervisor.ex 100% <100%> (+9.09%) ⬆️
lib/concentrate/merge.ex 89.28% <0%> (+5.35%) ⬆️
lib/concentrate/consumer/vehicle_positions.ex 100% <0%> (+14.28%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9b1e385...223078d. Read the comment docs.


@impl true
def init(opts) do
Supervisor.init(Concentrate.Pipeline.StopTimeUpdatesPipeline.init(opts), strategy: :rest_for_one)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also something like you could include in Pipeline, since you can pass a list of children to Supervisor.start_link. The two separate pipelines would be configured like this:

{Concentrate.Pipeline, module: Concentrate.Pipeline.StopTimeUpdates, ..other_opts..},
{Concentrate.Pipeline, module: Concentrate.Pipeline.VehiclePositions, ..other opts..}

The init/1 callback for Pipeline behaviors is the same, and then you implement start_link/1 and child_spec/1 on the Pipeline module directly. This is also a case where you'll want an id on the child spec: you can probably use the module for the ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I might not be fully grasping what you are saying here, perhaps starting with the bit about "you can pass a list of children to Supervisor.start_link". Are you suggesting something counter to the suggestion Sky made here of separating out the pipelines from a single supervisor? Or is what you are saying different?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write it up more thoroughly tomorrow morning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! We can also talk through it in person if that's easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defmodule Pipeline do
  @moduledoc """Create a pipeline supervisor for a given module."""
  
  @callback init(Keyword.t) :: [Supervisor.child_spec]

  def start_link(mod, opts) do
    children = mod.init(opts)
    Supervisor.start_link(children, restart: :rest_for_one)
  end

  def child_spec(opts) do
   mod = Keyword.fetch(opts, :module)
    %{
      type: :supervisor, 
      id: mod,
      start: {__MODULE__, :start_link, [mod, opts]}
    }
  end
end

# in Concentrate.Supervisor
def start_link(...) do
  Supervisor.start_link([
    {Pipeline, module: StopTimePipeline, ...},
    {Pipeline, module: VehiclePositionSupervisor, ...}
  ], restart: :one_for_one)
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's helpful. Let me know if you have any suggestions on my adaptation.

@arkadyan arkadyan force-pushed the mss-stu-pipeline branch 4 times, most recently from 088ebab to e822509 Compare March 10, 2020 14:22
Copy link
Member

@paulswartz paulswartz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good!

]

@impl Concentrate.Pipeline
@spec init(opts()) :: [Supervisor.child_spec()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • You alias Concentrate.Pipeline but then keep using Concentrate.Pipeline in most places (in both modules).
  • You don't need the @spec here, as it's implicitly the same as the @callback you're @impling.

Copy link
Member

@skyqrose skyqrose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good now, but I think we should wait for a ✅ from paul, too.

Supervisor.start_link(children, strategy: :rest_for_one)
end

def child_spec(opts) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a @impl true to say it implements the Supervisor.child_spec callback? (Or would that get in the way of this being a behavior itself?)

(Same for Concentrate.Supervisor.child_spec which doesn't currently have an @impl true.)

Copy link
Contributor Author

@arkadyan arkadyan Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so because these aren't declaring a behaviour. I added a spec here though.

swiftly_realtime_vehicles_url:
Application.get_env(:skate, :swiftly_realtime_vehicles_url),
trip_updates_url: Application.get_env(:skate, :trip_updates_url)
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this extra nesting come up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, moved that to the child_spec where it should actually be.

@arkadyan
Copy link
Contributor Author

@paulswartz does this PR look good to you at this point?

Copy link
Member

@paulswartz paulswartz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍰

@arkadyan arkadyan merged commit 46042ed into master Mar 11, 2020
@arkadyan arkadyan deleted the mss-stu-pipeline branch March 11, 2020 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants