Skip to content

Commit

Permalink
Refactor proc into procgroup (#779)
Browse files Browse the repository at this point in the history
This PR is prepping for when locally we run multiple processes of the
Encore application, one for each microservice and one for each gateway.

This starts by refactoring our existing `Proc` struct into `ProcGroup`
while keeping most of the API the same.

Then within a ProcGroup we introduce the concept of `Proc`'s which can
be started independently of each other.
  • Loading branch information
DomBlack authored Jun 29, 2023
1 parent dd9609d commit b95785c
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 204 deletions.
8 changes: 4 additions & 4 deletions cli/daemon/dash/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (h *handler) Handle(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2
if run == nil {
return reply(ctx, map[string]interface{}{"running": false}, nil)
}
proc := run.Proc()
proc := run.ProcGroup()
if proc == nil {
return reply(ctx, map[string]interface{}{"running": false}, nil)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (h *handler) apiCall(ctx context.Context, reply jsonrpc2.Replier, p *apiCal
log.Error().Str("app_id", p.AppID).Msg("dash: cannot make api call: app not running")
return reply(ctx, nil, fmt.Errorf("app not running"))
}
proc := run.Proc()
proc := run.ProcGroup()
if proc == nil {
log.Error().Str("app_id", p.AppID).Msg("dash: cannot make api call: app not running")
return reply(ctx, nil, fmt.Errorf("app not running"))
Expand Down Expand Up @@ -272,7 +272,7 @@ var _ run.EventListener = (*Server)(nil)
// OnStart notifies active websocket clients about the started run.
func (s *Server) OnStart(r *run.Run) {
m := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true}
proc := r.Proc()
proc := r.ProcGroup()
str, err := m.MarshalToString(proc.Meta)
if err != nil {
log.Error().Err(err).Msg("dash: could not marshal app meta")
Expand All @@ -295,7 +295,7 @@ func (s *Server) OnStart(r *run.Run) {
// OnReload notifies active websocket clients about the reloaded run.
func (s *Server) OnReload(r *run.Run) {
m := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true}
proc := r.Proc()
proc := r.ProcGroup()
str, err := m.MarshalToString(proc.Meta)
if err != nil {
log.Error().Err(err).Msg("dash: could not marshal app meta")
Expand Down
6 changes: 3 additions & 3 deletions cli/daemon/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *Server) Run(req *daemonpb.RunRequest, stream daemonpb.Daemon_RunServer)
ops.AllDone()

stderr.Write([]byte("\n"))
pid := runInstance.Proc().Pid
pid := runInstance.ProcGroup().Gateway.Pid
fmt.Fprintf(stderr, " Encore development server running!\n\n")

fmt.Fprintf(stderr, " Your API is running at: %s\n", aurora.Cyan("http://"+runInstance.ListenAddr))
Expand All @@ -153,7 +153,7 @@ func (s *Server) Run(req *daemonpb.RunRequest, stream daemonpb.Daemon_RunServer)
fmt.Fprintf(stderr, " Process ID: %d\n", aurora.Cyan(pid))
}
// Log which experiments are enabled, if any
if exp := runInstance.Proc().Experiments.List(); len(exp) > 0 {
if exp := runInstance.ProcGroup().Experiments.List(); len(exp) > 0 {
strs := make([]string, len(exp))
for i, e := range exp {
strs[i] = string(e)
Expand Down Expand Up @@ -189,7 +189,7 @@ func (s *Server) Run(req *daemonpb.RunRequest, stream daemonpb.Daemon_RunServer)
case <-runInstance.Done():
return
case <-time.After(5 * time.Second):
if proc := runInstance.Proc(); proc != nil {
if proc := runInstance.ProcGroup(); proc != nil {
showFirstRunExperience(runInstance, proc.Meta, stderr)
}
}
Expand Down
10 changes: 5 additions & 5 deletions cli/daemon/run/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ func (r *Run) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

proc := r.proc.Load().(*Proc)
proc := r.proc.Load().(*ProcGroup)
proc.forwardReq(endpoint, w, req)
}

// forwardReq forwards the request to the Encore app.
func (p *Proc) forwardReq(endpoint string, w http.ResponseWriter, req *http.Request) {
func (pg *ProcGroup) forwardReq(endpoint string, w http.ResponseWriter, req *http.Request) {
// director is a simplified version from httputil.NewSingleHostReverseProxy.
director := func(r *http.Request) {
r.URL.Scheme = "http"
r.URL.Host = p.Run.ListenAddr
r.URL.Host = pg.Run.ListenAddr
r.URL.Path = "/" + endpoint
r.URL.RawQuery = req.URL.RawQuery
if _, ok := r.Header["User-Agent"]; !ok {
Expand All @@ -47,7 +47,7 @@ func (p *Proc) forwardReq(endpoint string, w http.ResponseWriter, req *http.Requ

// Add the auth key unless the test header is set.
if r.Header.Get(TestHeaderDisablePlatformAuth) == "" {
addAuthKeyToRequest(r, p.authKey)
addAuthKeyToRequest(r, pg.authKey)
}
}

Expand All @@ -57,7 +57,7 @@ func (p *Proc) forwardReq(endpoint string, w http.ResponseWriter, req *http.Requ
transport := &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return p.Client.Open()
return pg.Gateway.Client.Open()
},
}

Expand Down
4 changes: 2 additions & 2 deletions cli/daemon/run/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type EventListener interface {

// FindProc finds the proc with the given id.
// It reports nil if no such proc was found.
func (mgr *Manager) FindProc(procID string) *Proc {
func (mgr *Manager) FindProc(procID string) *ProcGroup {
mgr.mu.Lock()
defer mgr.mu.Unlock()
for _, run := range mgr.runs {
if p := run.Proc(); p != nil && p.ID == procID {
if p := run.ProcGroup(); p != nil && p.ID == procID {
return p
}
}
Expand Down
Loading

0 comments on commit b95785c

Please sign in to comment.