Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][httpjson] Make httpjson use cursor input when using date cursor #20751

Merged
merged 13 commits into from
Sep 29, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add type and sub_type to panw panos fileset {pull}20912[20912]
- Always attempt community_id processor on zeek module {pull}21155[21155]
- Add related.hosts ecs field to all modules {pull}21160[21160]
- Keep cursor state between httpjson input restarts {pull}20751[20751]

*Heartbeat*

Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (cim *InputManager) shutdown() {

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
func (cim *InputManager) Create(config *common.Config) (v2.Input, error) {
if err := cim.init(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,7 +179,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {

// Lock locks a key for exclusive access and returns an resource that can be used to modify
// the cursor state and unlock the key.
func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) {
func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) {
resource := cim.store.Get(key)
err := lockResource(ctx.Logger, resource, ctx.Cancelation)
if err != nil {
Expand All @@ -190,7 +189,7 @@ func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error)
return resource, nil
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error {
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
return []v2.Plugin{
cloudfoundry.Plugin(),
http_endpoint.Plugin(),
httpjson.Plugin(),
httpjson.Plugin(log, store),
o365audit.Plugin(log, store),
}
}
58 changes: 29 additions & 29 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

// Config contains information about httpjson configuration
// config contains information about httpjson configuration
type config struct {
OAuth2 *OAuth2 `config:"oauth2"`
OAuth2 *oauth2Config `config:"oauth2"`
APIKey string `config:"api_key"`
AuthenticationScheme string `config:"authentication_scheme"`
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
Expand All @@ -30,98 +30,98 @@ type config struct {
JSONObjects string `config:"json_objects_array"`
SplitEventsBy string `config:"split_events_by"`
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
Pagination *paginationConfig `config:"pagination"`
RateLimit *rateLimitConfig `config:"rate_limit"`
RetryMax int `config:"retry.max_attempts"`
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL *URL `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
URL *urlConfig `config:"url" validate:"required"`
DateCursor *dateCursorConfig `config:"date_cursor"`
}

// Pagination contains information about httpjson pagination settings
type Pagination struct {
type paginationConfig struct {
Enabled *bool `config:"enabled"`
ExtraBodyContent common.MapStr `config:"extra_body_content"`
Header *Header `config:"header"`
Header *headerConfig `config:"header"`
IDField string `config:"id_field"`
RequestField string `config:"req_field"`
URLField string `config:"url_field"`
URL string `config:"url"`
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (p *Pagination) IsEnabled() bool {
func (p *paginationConfig) isEnabled() bool {
return p != nil && (p.Enabled == nil || *p.Enabled)
}

// HTTP Header information for pagination
type Header struct {
type headerConfig struct {
FieldName string `config:"field_name" validate:"required"`
RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"`
}

// HTTP Header Rate Limit information
type RateLimit struct {
type rateLimitConfig struct {
Limit string `config:"limit"`
Reset string `config:"reset"`
Remaining string `config:"remaining"`
}

type DateCursor struct {
Enabled *bool `config:"enabled"`
Field string `config:"field"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *Template `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
type dateCursorConfig struct {
Enabled *bool `config:"enabled"`
Field string `config:"field"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *templateConfig `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
}

type Template struct {
type templateConfig struct {
*template.Template
}

func (t *Template) Unpack(in string) error {
func (t *templateConfig) Unpack(in string) error {
tpl, err := template.New("tpl").Parse(in)
if err != nil {
return err
}

*t = Template{Template: tpl}
*t = templateConfig{Template: tpl}

return nil
}

type URL struct {
type urlConfig struct {
*url.URL
}

func (u *URL) Unpack(in string) error {
func (u *urlConfig) Unpack(in string) error {
parsed, err := url.Parse(in)
if err != nil {
return err
}

*u = URL{URL: parsed}
*u = urlConfig{URL: parsed}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
func (dc *dateCursorConfig) isEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) GetDateFormat() string {
func (dc *dateCursorConfig) getDateFormat() string {
if dc.DateFormat == "" {
return time.RFC3339
}
return dc.DateFormat
}

func (dc *DateCursor) Validate() error {
func (dc *dateCursorConfig) Validate() error {
if dc.DateFormat == "" {
return nil
}
Expand Down Expand Up @@ -154,15 +154,15 @@ func (c *config) Validate() error {
}
}
}
if c.OAuth2.IsEnabled() {
if c.OAuth2.isEnabled() {
if c.APIKey != "" || c.AuthenticationScheme != "" {
return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
}
}
return nil
}

func defaultConfig() config {
func newDefaultConfig() config {
var c config
c.HTTPMethod = "GET"
c.HTTPClientTimeout = 60 * time.Second
Expand Down
73 changes: 37 additions & 36 deletions x-pack/filebeat/input/httpjson/config_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@ import (
"golang.org/x/oauth2/google"
)

// An OAuth2Provider represents a supported oauth provider.
type OAuth2Provider string
// An oauth2Provider represents a supported oauth provider.
type oauth2Provider string

const (
OAuth2ProviderDefault OAuth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
OAuth2ProviderAzure OAuth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
OAuth2ProviderGoogle OAuth2Provider = "google" // OAuth2ProviderGoogle Google.
oauth2ProviderDefault oauth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
oauth2ProviderAzure oauth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
oauth2ProviderGoogle oauth2Provider = "google" // OAuth2ProviderGoogle Google.
)

func (p *OAuth2Provider) Unpack(in string) error {
*p = OAuth2Provider(in)
func (p *oauth2Provider) Unpack(in string) error {
*p = oauth2Provider(in)
return nil
}

func (p OAuth2Provider) canonical() OAuth2Provider {
return OAuth2Provider(strings.ToLower(string(p)))
func (p oauth2Provider) canonical() oauth2Provider {
return oauth2Provider(strings.ToLower(string(p)))
}

// OAuth2 contains information about oauth2 authentication settings.
type OAuth2 struct {
// oauth2Config contains information about oauth2 authentication settings.
type oauth2Config struct {
// common oauth fields
ClientID string `config:"client.id"`
ClientSecret string `config:"client.secret"`
Enabled *bool `config:"enabled"`
EndpointParams map[string][]string `config:"endpoint_params"`
Provider OAuth2Provider `config:"provider"`
Provider oauth2Provider `config:"provider"`
Scopes []string `config:"scopes"`
TokenURL string `config:"token_url"`

Expand All @@ -61,25 +61,26 @@ type OAuth2 struct {
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (o *OAuth2) IsEnabled() bool {
func (o *oauth2Config) isEnabled() bool {
return o != nil && (o.Enabled == nil || *o.Enabled)
}

// Client wraps the given http.Client and returns a new one that will use the oauth authentication.
func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) {
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
func (o *oauth2Config) client(ctx context.Context, client *http.Client) (*http.Client, error) {
// only required to let oauth2 library to find our custom client in the context
ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client)

switch o.GetProvider() {
case OAuth2ProviderAzure, OAuth2ProviderDefault:
switch o.getProvider() {
case oauth2ProviderAzure, oauth2ProviderDefault:
creds := clientcredentials.Config{
ClientID: o.ClientID,
ClientSecret: o.ClientSecret,
TokenURL: o.GetTokenURL(),
TokenURL: o.getTokenURL(),
Scopes: o.Scopes,
EndpointParams: o.GetEndpointParams(),
EndpointParams: o.getEndpointParams(),
}
return creds.Client(ctx), nil
case OAuth2ProviderGoogle:
case oauth2ProviderGoogle:
if o.GoogleJWTFile != "" {
cfg, err := google.JWTConfigFromJSON(o.GoogleCredentialsJSON, o.Scopes...)
if err != nil {
Expand All @@ -100,9 +101,9 @@ func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client,
}

// GetTokenURL returns the TokenURL.
func (o *OAuth2) GetTokenURL() string {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o *oauth2Config) getTokenURL() string {
switch o.getProvider() {
case oauth2ProviderAzure:
if o.TokenURL == "" {
return endpoints.AzureAD(o.AzureTenantID).TokenURL
}
Expand All @@ -112,14 +113,14 @@ func (o *OAuth2) GetTokenURL() string {
}

// GetProvider returns provider in its canonical form.
func (o OAuth2) GetProvider() OAuth2Provider {
func (o oauth2Config) getProvider() oauth2Provider {
return o.Provider.canonical()
}

// GetEndpointParams returns endpoint params with any provider ones combined.
func (o OAuth2) GetEndpointParams() map[string][]string {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o oauth2Config) getEndpointParams() map[string][]string {
switch o.getProvider() {
case oauth2ProviderAzure:
if o.AzureResource != "" {
if o.EndpointParams == nil {
o.EndpointParams = map[string][]string{}
Expand All @@ -132,26 +133,26 @@ func (o OAuth2) GetEndpointParams() map[string][]string {
}

// Validate checks if oauth2 config is valid.
func (o *OAuth2) Validate() error {
switch o.GetProvider() {
case OAuth2ProviderAzure:
func (o *oauth2Config) Validate() error {
switch o.getProvider() {
case oauth2ProviderAzure:
return o.validateAzureProvider()
case OAuth2ProviderGoogle:
case oauth2ProviderGoogle:
return o.validateGoogleProvider()
case OAuth2ProviderDefault:
case oauth2ProviderDefault:
if o.TokenURL == "" || o.ClientID == "" || o.ClientSecret == "" {
return errors.New("invalid configuration: both token_url and client credentials must be provided")
}
default:
return fmt.Errorf("invalid configuration: unknown provider %q", o.GetProvider())
return fmt.Errorf("invalid configuration: unknown provider %q", o.getProvider())
}
return nil
}

// findDefaultGoogleCredentials will default to google.FindDefaultCredentials and will only be changed for testing purposes
var findDefaultGoogleCredentials = google.FindDefaultCredentials

func (o *OAuth2) validateGoogleProvider() error {
func (o *oauth2Config) validateGoogleProvider() error {
if o.TokenURL != "" || o.ClientID != "" || o.ClientSecret != "" ||
o.AzureTenantID != "" || o.AzureResource != "" || len(o.EndpointParams) > 0 {
return errors.New("invalid configuration: none of token_url and client credentials can be used, use google.credentials_file, google.jwt_file, google.credentials_json or ADC instead")
Expand Down Expand Up @@ -191,7 +192,7 @@ func (o *OAuth2) validateGoogleProvider() error {
return fmt.Errorf("invalid configuration: no authentication credentials were configured or detected (ADC)")
}

func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
func (o *oauth2Config) populateCredentialsJSONFromFile(file string) error {
if _, err := os.Stat(file); os.IsNotExist(err) {
return fmt.Errorf("invalid configuration: the file %q cannot be found", file)
}
Expand All @@ -210,7 +211,7 @@ func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
return nil
}

func (o *OAuth2) validateAzureProvider() error {
func (o *oauth2Config) validateAzureProvider() error {
if o.TokenURL == "" && o.AzureTenantID == "" {
return errors.New("invalid configuration: at least one of token_url or tenant_id must be provided")
}
Expand Down
Loading