Skip to content

Commit

Permalink
[Filebeat][httpjson] Make httpjson use cursor input when using date c…
Browse files Browse the repository at this point in the history
…ursor (#20751) (#21384)

* Fix duplicate import

* Move config to its own package

* Minor improvements

* Fix tests

* Create input manager

* Change requester to accept and store a cursor

* Modify input to be embedded

* Create stateless and cursor inputs

* Initialize new input manager on publish

* Add changelog entry and format files

* Move test data folder

* Change tests

* Apply requested changes

(cherry picked from commit 8f9d54b)
  • Loading branch information
marc-gr authored Oct 2, 2020
1 parent 250c4d5 commit df102bd
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 184 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Add type and sub_type to panw panos fileset {pull}20912[20912]
- Always attempt community_id processor on zeek module {pull}21155[21155]
- Add related.hosts ecs field to all modules {pull}21160[21160]
- Keep cursor state between httpjson input restarts {pull}20751[20751]

*Heartbeat*

Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (cim *InputManager) shutdown() {

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
func (cim *InputManager) Create(config *common.Config) (v2.Input, error) {
if err := cim.init(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,7 +179,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {

// Lock locks a key for exclusive access and returns an resource that can be used to modify
// the cursor state and unlock the key.
func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) {
func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) {
resource := cim.store.Get(key)
err := lockResource(ctx.Logger, resource, ctx.Cancelation)
if err != nil {
Expand All @@ -190,7 +189,7 @@ func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error)
return resource, nil
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error {
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
return []v2.Plugin{
cloudfoundry.Plugin(),
http_endpoint.Plugin(),
httpjson.Plugin(),
httpjson.Plugin(log, store),
o365audit.Plugin(log, store),
}
}
58 changes: 29 additions & 29 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

// Config contains information about httpjson configuration
// config contains information about httpjson configuration
type config struct {
OAuth2 *OAuth2 `config:"oauth2"`
OAuth2 *oauth2Config `config:"oauth2"`
APIKey string `config:"api_key"`
AuthenticationScheme string `config:"authentication_scheme"`
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
Expand All @@ -30,98 +30,98 @@ type config struct {
JSONObjects string `config:"json_objects_array"`
SplitEventsBy string `config:"split_events_by"`
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
Pagination *paginationConfig `config:"pagination"`
RateLimit *rateLimitConfig `config:"rate_limit"`
RetryMax int `config:"retry.max_attempts"`
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL *URL `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
URL *urlConfig `config:"url" validate:"required"`
DateCursor *dateCursorConfig `config:"date_cursor"`
}

// Pagination contains information about httpjson pagination settings
type Pagination struct {
type paginationConfig struct {
Enabled *bool `config:"enabled"`
ExtraBodyContent common.MapStr `config:"extra_body_content"`
Header *Header `config:"header"`
Header *headerConfig `config:"header"`
IDField string `config:"id_field"`
RequestField string `config:"req_field"`
URLField string `config:"url_field"`
URL string `config:"url"`
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (p *Pagination) IsEnabled() bool {
func (p *paginationConfig) isEnabled() bool {
return p != nil && (p.Enabled == nil || *p.Enabled)
}

// HTTP Header information for pagination
type Header struct {
type headerConfig struct {
FieldName string `config:"field_name" validate:"required"`
RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"`
}

// HTTP Header Rate Limit information
type RateLimit struct {
type rateLimitConfig struct {
Limit string `config:"limit"`
Reset string `config:"reset"`
Remaining string `config:"remaining"`
}

type DateCursor struct {
Enabled *bool `config:"enabled"`
Field string `config:"field"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *Template `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
type dateCursorConfig struct {
Enabled *bool `config:"enabled"`
Field string `config:"field"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *templateConfig `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
}

type Template struct {
type templateConfig struct {
*template.Template
}

func (t *Template) Unpack(in string) error {
func (t *templateConfig) Unpack(in string) error {
tpl, err := template.New("tpl").Parse(in)
if err != nil {
return err
}

*t = Template{Template: tpl}
*t = templateConfig{Template: tpl}

return nil
}

type URL struct {
type urlConfig struct {
*url.URL
}

func (u *URL) Unpack(in string) error {
func (u *urlConfig) Unpack(in string) error {
parsed, err := url.Parse(in)
if err != nil {
return err
}

*u = URL{URL: parsed}
*u = urlConfig{URL: parsed}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
func (dc *dateCursorConfig) isEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) GetDateFormat() string {
func (dc *dateCursorConfig) getDateFormat() string {
if dc.DateFormat == "" {
return time.RFC3339
}
return dc.DateFormat
}

func (dc *DateCursor) Validate() error {
func (dc *dateCursorConfig) Validate() error {
if dc.DateFormat == "" {
return nil
}
Expand Down Expand Up @@ -154,15 +154,15 @@ func (c *config) Validate() error {
}
}
}
if c.OAuth2.IsEnabled() {
if c.OAuth2.isEnabled() {
if c.APIKey != "" || c.AuthenticationScheme != "" {
return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
}
}
return nil
}

func defaultConfig() config {
func newDefaultConfig() config {
var c config
c.HTTPMethod = "GET"
c.HTTPClientTimeout = 60 * time.Second
Expand Down
73 changes: 37 additions & 36 deletions x-pack/filebeat/input/httpjson/config_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@ import (
"golang.org/x/oauth2/google"
)

// An OAuth2Provider represents a supported oauth provider.
type OAuth2Provider string
// An oauth2Provider represents a supported oauth provider.
type oauth2Provider string

const (
OAuth2ProviderDefault OAuth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
OAuth2ProviderAzure OAuth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
OAuth2ProviderGoogle OAuth2Provider = "google" // OAuth2ProviderGoogle Google.
oauth2ProviderDefault oauth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
oauth2ProviderAzure oauth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
oauth2ProviderGoogle oauth2Provider = "google" // OAuth2ProviderGoogle Google.
)

func (p *OAuth2Provider) Unpack(in string) error {
*p = OAuth2Provider(in)
func (p *oauth2Provider) Unpack(in string) error {
*p = oauth2Provider(in)
return nil
}

func (p OAuth2Provider) canonical() OAuth2Provider {
return OAuth2Provider(strings.ToLower(string(p)))
func (p oauth2Provider) canonical() oauth2Provider {
return oauth2Provider(strings.ToLower(string(p)))
}

// OAuth2 contains information about oauth2 authentication settings.
type OAuth2 struct {
// oauth2Config contains information about oauth2 authentication settings.
type oauth2Config struct {
// common oauth fields
ClientID string `config:"client.id"`
ClientSecret string `config:"client.secret"`
Enabled *bool `config:"enabled"`
EndpointParams map[string][]string `config:"endpoint_params"`
Provider OAuth2Provider `config:"provider"`
Provider oauth2Provider `config:"provider"`
Scopes []string `config:"scopes"`
TokenURL string `config:"token_url"`

Expand All @@ -61,25 +61,26 @@ type OAuth2 struct {
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (o *OAuth2) IsEnabled() bool {
func (o *oauth2Config) isEnabled() bool {
return o != nil && (o.Enabled == nil || *o.Enabled)
}

// Client wraps the given http.Client and returns a new one that will use the oauth authentication.
func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) {
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
func (o *oauth2Config) client(ctx context.Context, client *http.Client) (*http.Client, error) {
// only required to let oauth2 library to find our custom client in the context
ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client)

switch o.GetProvider() {
case OAuth2ProviderAzure, OAuth2ProviderDefault:
switch o.getProvider() {
case oauth2ProviderAzure, oauth2ProviderDefault:
creds := clientcredentials.Config{
ClientID: o.ClientID,
ClientSecret: o.ClientSecret,
TokenURL: o.GetTokenURL(),
TokenURL: o.getTokenURL(),
Scopes: o.Scopes,
EndpointParams: o.GetEndpointParams(),
EndpointParams: o.getEndpointParams(),
}
return creds.Client(ctx), nil
case OAuth2ProviderGoogle:
case oauth2ProviderGoogle:
if o.GoogleJWTFile != "" {
cfg, err := google.JWTConfigFromJSON(o.GoogleCredentialsJSON, o.Scopes...)
if err != nil {
Expand All @@ -100,9 +101,9 @@ func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client,
}

// GetTokenURL returns the TokenURL.
func (o *OAuth2) GetTokenURL() string {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o *oauth2Config) getTokenURL() string {
switch o.getProvider() {
case oauth2ProviderAzure:
if o.TokenURL == "" {
return endpoints.AzureAD(o.AzureTenantID).TokenURL
}
Expand All @@ -112,14 +113,14 @@ func (o *OAuth2) GetTokenURL() string {
}

// GetProvider returns provider in its canonical form.
func (o OAuth2) GetProvider() OAuth2Provider {
func (o oauth2Config) getProvider() oauth2Provider {
return o.Provider.canonical()
}

// GetEndpointParams returns endpoint params with any provider ones combined.
func (o OAuth2) GetEndpointParams() map[string][]string {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o oauth2Config) getEndpointParams() map[string][]string {
switch o.getProvider() {
case oauth2ProviderAzure:
if o.AzureResource != "" {
if o.EndpointParams == nil {
o.EndpointParams = map[string][]string{}
Expand All @@ -132,26 +133,26 @@ func (o OAuth2) GetEndpointParams() map[string][]string {
}

// Validate checks if oauth2 config is valid.
func (o *OAuth2) Validate() error {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o *oauth2Config) Validate() error {
switch o.getProvider() {
case oauth2ProviderAzure:
return o.validateAzureProvider()
case OAuth2ProviderGoogle:
case oauth2ProviderGoogle:
return o.validateGoogleProvider()
case OAuth2ProviderDefault:
case oauth2ProviderDefault:
if o.TokenURL == "" || o.ClientID == "" || o.ClientSecret == "" {
return errors.New("invalid configuration: both token_url and client credentials must be provided")
}
default:
return fmt.Errorf("invalid configuration: unknown provider %q", o.GetProvider())
return fmt.Errorf("invalid configuration: unknown provider %q", o.getProvider())
}
return nil
}

// findDefaultGoogleCredentials will default to google.FindDefaultCredentials and will only be changed for testing purposes
var findDefaultGoogleCredentials = google.FindDefaultCredentials

func (o *OAuth2) validateGoogleProvider() error {
func (o *oauth2Config) validateGoogleProvider() error {
if o.TokenURL != "" || o.ClientID != "" || o.ClientSecret != "" ||
o.AzureTenantID != "" || o.AzureResource != "" || len(o.EndpointParams) > 0 {
return errors.New("invalid configuration: none of token_url and client credentials can be used, use google.credentials_file, google.jwt_file, google.credentials_json or ADC instead")
Expand Down Expand Up @@ -191,7 +192,7 @@ func (o *OAuth2) validateGoogleProvider() error {
return fmt.Errorf("invalid configuration: no authentication credentials were configured or detected (ADC)")
}

func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
func (o *oauth2Config) populateCredentialsJSONFromFile(file string) error {
if _, err := os.Stat(file); os.IsNotExist(err) {
return fmt.Errorf("invalid configuration: the file %q cannot be found", file)
}
Expand All @@ -210,7 +211,7 @@ func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
return nil
}

func (o *OAuth2) validateAzureProvider() error {
func (o *oauth2Config) validateAzureProvider() error {
if o.TokenURL == "" && o.AzureTenantID == "" {
return errors.New("invalid configuration: at least one of token_url or tenant_id must be provided")
}
Expand Down
Loading

0 comments on commit df102bd

Please sign in to comment.