Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/fix raft bug #11

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
- server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.)
- cluster management (raft commands, replica commands, etc.) (low priority)
- other commands (backup, restore, etc.) (low priority)
- replace k6 with wrk for performance tests

## Add Module Tests

Expand All @@ -16,9 +15,6 @@
- [ ] - Add option to specify export variables when ambiguous (?)
- [ ] - check how global variable conflicts are handled

## Start using Pkl

replace dgate server config with pkl.

## dgate-cli declaritive config

Expand Down Expand Up @@ -70,10 +66,6 @@ expose metrics for the following:
- Add Transactions
- [ ] - Add transactional support for admin API

## DGate Documentation (dgate.io/docs)

Use Docusaurus to create the documentation for DGate.

## DGate Admin Console (low priority)

Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser.
Expand Down Expand Up @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen
Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster.


## DGate CLI - argument variable suggestions

For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match.
```
dgate-cli ns mk my-ns nmae=my-ns
Variable 'nmae' is not recognized. Did you mean 'name'?
```

## DGate CLI - help command show required variables

When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables.
Expand All @@ -159,4 +143,8 @@ Add stack tracing for typescript modules.

Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation.

## Add Telemetry (sentry, datadog, etc.)
## Add Telemetry (sentry, datadog, etc.)

## ResourceManager callback for resource changes

Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more.
1 change: 1 addition & 0 deletions functional-tests/raft_tests/test1.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: v1
debug: true
log_level: info

tags:
Expand Down
93 changes: 48 additions & 45 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,17 @@
return &dgateAdminFSM{cs, logger}
}

func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool {
return !fsm.cs.Ready() &&
log.Index+1 >= fsm.cs.Raft().LastIndex() &&
log.Index+1 >= fsm.cs.Raft().AppliedIndex()
func (fsm *dgateAdminFSM) isLatestLog(log *raft.Log) bool {
rft := fsm.cs.Raft()
return log.Index == rft.CommitIndex() ||
log.Index+1 == rft.CommitIndex()

Check warning on line 27 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L24-L27

Added lines #L24 - L27 were not covered by tests
}

func (fsm *dgateAdminFSM) checkLast(log *raft.Log) {
rft := fsm.cs.Raft()
if !fsm.cs.Ready() && fsm.isReplay(log) {
fsm.logger.Info("FSM is not ready, setting ready",
zap.Uint64("index", log.Index),
zap.Uint64("applied-index", rft.AppliedIndex()),
zap.Uint64("last-index", rft.LastIndex()),
)
defer func() {
if err := fsm.cs.ReloadState(false); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()
}
}()
func (fsm *dgateAdminFSM) reload(cls ...*spec.ChangeLog) {
if err := fsm.cs.ReloadState(false, cls...); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()

Check warning on line 34 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L30-L34

Added lines #L30 - L34 were not covered by tests
}
}

Expand Down Expand Up @@ -82,43 +72,56 @@
}

func (fsm *dgateAdminFSM) Apply(log *raft.Log) any {
defer fsm.checkLast(log)
_, err := fsm.applyLog(log)
rft := fsm.cs.Raft()
fsm.logger.Debug("applying log",
zap.Uint64("current", log.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
cl, err := fsm.applyLog(log)
if err != nil && !fsm.cs.Ready() {
fsm.reload(cl)
} else {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))

Check warning on line 86 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L75-L86

Added lines #L75 - L86 were not covered by tests
}
return err
}

func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any {
lastLog := logs[len(logs)-1]
if fsm.isReplay(lastLog) {
rft := fsm.cs.Raft()
fsm.logger.Info("applying log batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
if len(logs) == 0 || logs == nil {
fsm.logger.Warn("No logs to apply in ApplyBatch")
return nil

Check warning on line 94 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L92-L94

Added lines #L92 - L94 were not covered by tests
}
cls := make([]*spec.ChangeLog, 0, len(logs))
defer func() {
if !fsm.cs.Ready() {
fsm.checkLast(logs[len(logs)-1])
return
}

if err := fsm.cs.ReloadState(true, cls...); err != nil {
fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err))
}
}()
lastLog := logs[len(logs)-1]
rft := fsm.cs.Raft()
fsm.logger.Info("applying batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)

Check warning on line 104 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L96-L104

Added lines #L96 - L104 were not covered by tests

cls := make([]*spec.ChangeLog, 0, len(logs))

Check warning on line 106 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L106

Added line #L106 was not covered by tests
results := make([]any, len(logs))
for i, log := range logs {
var cl *spec.ChangeLog
cl, results[i] = fsm.applyLog(log)
if cl != nil {
var (
cl *spec.ChangeLog
err error
)
if cl, err = fsm.applyLog(log); err != nil {
results[i] = err
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {

Check warning on line 116 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L109-L116

Added lines #L109 - L116 were not covered by tests
cls = append(cls, cl)
}
}

if fsm.cs.Ready() || fsm.isLatestLog(lastLog) {
fsm.reload(cls...)

Check warning on line 122 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}

return results
}

Expand Down
67 changes: 24 additions & 43 deletions internal/admin/admin_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import (
"fmt"
"log"
"net"
"net/http"
"strings"

Expand Down Expand Up @@ -75,43 +74,20 @@
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if ipList.Len() > 0 {
remoteHost, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
remoteHost = r.RemoteAddr
}
allowed, err := ipList.Contains(remoteHost)
if !allowed && adminConfig.XForwardedForDepth > 0 {
xForwardedForIps := r.Header.Values("X-Forwarded-For")
count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps))
for i := 0; i < count; i++ {
allowed, err = ipList.Contains(xForwardedForIps[i])
if err != nil {
logger.Error("error checking x-forwarded-for ip",
zap.Error(err),
)
if conf.Debug {
http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest)
}
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if allowed {
break
}
}
}

remoteIp := util.GetTrustedIP(r,
conf.AdminConfig.XForwardedForDepth)
allowed, err := ipList.Contains(remoteIp)

Check warning on line 79 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L77-L79

Added lines #L77 - L79 were not covered by tests
if err != nil {
if conf.Debug {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
http.Error(w, "could not parse X-Forwarded-For IP", http.StatusBadRequest)

Check warning on line 85 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L85

Added line #L85 was not covered by tests
return
}
if !allowed {
if conf.Debug {
http.Error(w, "Unauthorized IP Address: "+remoteHost, http.StatusUnauthorized)
http.Error(w, "Unauthorized IP Address: "+remoteIp, http.StatusUnauthorized)

Check warning on line 90 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L90

Added line #L90 was not covered by tests
return
}
http.Error(w, "Unauthorized", http.StatusUnauthorized)
Expand All @@ -138,24 +114,25 @@
} else if adminConfig.KeyAuth.HeaderName != "" {
key = r.Header.Get(adminConfig.KeyAuth.HeaderName)
} else {
key = r.Header.Get("X-API-Key")
key = r.Header.Get("X-DGate-Key")

Check warning on line 117 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L117

Added line #L117 was not covered by tests
}
if _, keyFound := keyMap[key]; !keyFound {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
}
raftInstance := cs.Raft()
if r.Method == http.MethodPut && raftInstance != nil {
leader := raftInstance.Leader()
if leader == "" {
util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader")
return
}
if raftInstance.State() != raft.Leader {
r.URL.Host = string(leader)
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
return
if raftInstance := cs.Raft(); raftInstance != nil {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
leader := raftInstance.Leader()
if leader == "" {
util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader")
return

Check warning on line 129 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L124-L129

Added lines #L124 - L129 were not covered by tests
}
if raftInstance.State() != raft.Leader {
r.URL.Host = string(leader)
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
return

Check warning on line 134 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L131-L134

Added lines #L131 - L134 were not covered by tests
}
}
}

Expand All @@ -165,10 +142,14 @@

server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil))
w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly))
w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash()))
w.Header().Set("X-DGate-AdminAPI", "true")
if raftInstance := cs.Raft(); raftInstance != nil {
w.Header().Set(
"X-DGate-Raft-State",
raftInstance.State().String(),
)

Check warning on line 151 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L147-L151

Added lines #L147 - L151 were not covered by tests
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("DGate Admin API"))
}))
Expand Down
27 changes: 15 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ type (
}

DGateProxyConfig struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
TLS *DGateTLSConfig `koanf:"tls"`
EnableH2C bool `koanf:"enable_h2c"`
EnableHTTP2 bool `koanf:"enable_http2"`
EnableConsoleLogger bool `koanf:"enable_console_logger"`
RedirectHttpsDomains []string `koanf:"redirect_https"`
AllowedDomains []string `koanf:"allowed_domains"`
GlobalHeaders map[string]string `koanf:"global_headers"`
Transport DGateHttpTransportConfig `koanf:"client_transport"`
Host string `koanf:"host"`
Port int `koanf:"port"`
TLS *DGateTLSConfig `koanf:"tls"`
EnableH2C bool `koanf:"enable_h2c"`
EnableHTTP2 bool `koanf:"enable_http2"`
EnableConsoleLogger bool `koanf:"enable_console_logger"`
RedirectHttpsDomains []string `koanf:"redirect_https"`
AllowedDomains []string `koanf:"allowed_domains"`
GlobalHeaders map[string]string `koanf:"global_headers"`
Transport DGateHttpTransportConfig `koanf:"client_transport"`
DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"`
StrictMode bool `koanf:"strict_mode"`
XForwardedForDepth int `koanf:"x_forwarded_for_depth"`

// WARN: debug use only
InitResources *DGateResources `koanf:"init_resources"`
DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"`
InitResources *DGateResources `koanf:"init_resources"`
}

DGateTestServerConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion internal/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
shell := "/bin/sh"
if shellEnv := os.Getenv("SHELL"); shellEnv != "" {
shell = shellEnv
}
}
resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) {
cmdResult, err := exec.CommandContext(
ctx, shell, "-c", results["cmd"]).Output()
Expand Down Expand Up @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
}
if k.Exists("admin") {
kDefault(k, "admin.host", "127.0.0.1")
kDefault(k, "admin.x_forwarded_for_depth", -1)
err = kRequireAll(k, "admin.port")
if err != nil {
return nil, err
Expand Down
Loading
Loading