From 12c5ab469ac3b914b317cf83a52808bb3ac5a39f Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 5 Mar 2020 10:13:31 +0100 Subject: [PATCH 1/3] New input for Office 365 audit logs (#16244) This input uses Microsoft's Office 365 Management API to fetch audit events. Relates to #16196 (cherry picked from commit ed80900899a985f9aef90f3fa63607325aef5d15) --- CHANGELOG.next.asciidoc | 2 + go.mod | 1 + .../docs/inputs/input-o365audit.asciidoc | 134 ++++++ x-pack/filebeat/include/list.go | 1 + x-pack/filebeat/input/o365audit/auth/auth.go | 41 ++ x-pack/filebeat/input/o365audit/auth/cert.go | 66 +++ .../filebeat/input/o365audit/auth/secret.go | 25 ++ x-pack/filebeat/input/o365audit/config.go | 195 +++++++++ .../filebeat/input/o365audit/contentblob.go | 146 +++++++ .../input/o365audit/contentblob_test.go | 149 +++++++ x-pack/filebeat/input/o365audit/dates.go | 107 +++++ x-pack/filebeat/input/o365audit/input.go | 303 +++++++++++++ x-pack/filebeat/input/o365audit/listblobs.go | 297 +++++++++++++ .../input/o365audit/listblobs_test.go | 413 ++++++++++++++++++ x-pack/filebeat/input/o365audit/pagination.go | 65 +++ x-pack/filebeat/input/o365audit/poll/poll.go | 268 ++++++++++++ x-pack/filebeat/input/o365audit/schema.go | 66 +++ x-pack/filebeat/input/o365audit/state.go | 158 +++++++ x-pack/filebeat/input/o365audit/state_test.go | 105 +++++ x-pack/filebeat/input/o365audit/subscribe.go | 81 ++++ 20 files changed, 2623 insertions(+) create mode 100644 x-pack/filebeat/docs/inputs/input-o365audit.asciidoc create mode 100644 x-pack/filebeat/input/o365audit/auth/auth.go create mode 100644 x-pack/filebeat/input/o365audit/auth/cert.go create mode 100644 x-pack/filebeat/input/o365audit/auth/secret.go create mode 100644 x-pack/filebeat/input/o365audit/config.go create mode 100644 x-pack/filebeat/input/o365audit/contentblob.go create mode 100644 x-pack/filebeat/input/o365audit/contentblob_test.go create mode 100644 x-pack/filebeat/input/o365audit/dates.go create mode 100644 x-pack/filebeat/input/o365audit/input.go create mode 100644 x-pack/filebeat/input/o365audit/listblobs.go create mode 100644 x-pack/filebeat/input/o365audit/listblobs_test.go create mode 100644 x-pack/filebeat/input/o365audit/pagination.go create mode 100644 x-pack/filebeat/input/o365audit/poll/poll.go create mode 100644 x-pack/filebeat/input/o365audit/schema.go create mode 100644 x-pack/filebeat/input/o365audit/state.go create mode 100644 x-pack/filebeat/input/o365audit/state_test.go create mode 100644 x-pack/filebeat/input/o365audit/subscribe.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cbe63591979a..a60d4459bfaa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -263,6 +263,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Release ActiveMQ module as GA. {issue}17047[17047] {pull}17049[17049] - Improve ECS categorization field mappings in iptables module. {issue}16166[16166] {pull}16637[16637] - Add pattern for Cisco ASA / FTD Message 734001 {issue}16212[16212] {pull}16612[16612] +- Allow users to override pipeline ID in fileset input config. {issue}9531[9531] {pull}16561[16561] +- Add `o365audit` input type for consuming events from Office 365 Management Activity API. {issue}16196[16196] {pull}16244[16244] *Heartbeat* diff --git a/go.mod b/go.mod index 3d3297bd0bfa..e8c8e5346930 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.8.0 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect github.com/Azure/go-autorest/autorest v0.9.4 + github.com/Azure/go-autorest/autorest/adal v0.8.1 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 github.com/Azure/go-autorest/autorest/date v0.2.0 github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 diff --git a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc new file mode 100644 index 000000000000..aa1e5370b289 --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc @@ -0,0 +1,134 @@ +[role="xpack"] + +:type: o365audit + +[id="{beatname_lc}-input-{type}"] +=== Office 365 Management Activity API input + +++++ +Office 365 Management Activity API +++++ + +beta[] + +Use the `o365audit` input to retrieve audit messages from Office 365 +and Azure AD activity logs. These are the same logs that are available under +_Audit_ _log_ _search_ in the _Security_ _and_ _Compliance_ center. + +A single input instance can be used to fetch events for multiple tenants as long +as a single application is configured to access all tenants. Certificate-based +authentication is recommended in this scenario. + +This input doesn't perform any transformation on the incoming messages, notably +no {ecs-ref}/ecs-reference.html[Elastic Common Schema fields] are populated, and +some data is encoded as arrays of objects, which are difficult to query in +Elasticsearch. You probably want to use the +{filebeat-ref}/filebeat-module-o365.html[o365 module] instead. +// TODO: link to O365 module docs. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: o365audit + application_id: my-application-id + tenant_id: my-tenant-id + client_secret: my-client-secret +---- + +Multi-tenancy and certificate-based authentication is also supported: + +---- +{beatname_lc}.inputs: +- type: o365audit + application_id: my-application-id + tenant_id: + - tenant-id-A + - tenant-id-B + - tenant-id-C + certificate: /path/to/cert.pem + key: /path/to/private.pem + # key_passphrase: "my key's password" +---- + +==== Configuration options + +The `o365audit` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +===== `application_id` + +The Application ID (also known as Client ID) of the Azure application to +authenticate as. + +[float] +===== `tenant_id` + +The tenant ID (also known as Directory ID) whose data is to be fetched. It's +also possible to specify a list of tenants IDs to fetch data from more than +one tenant. + +[float] +===== `content_type` + +List of content types to fetch. The default is to fetch all known content types: + +- Audit.AzureActiveDirectory +- Audit.Exchange +- Audit.SharePoint +- Audit.General +- DLP.All + +[float] +===== `client_secret` + +The client secret used for authentication. + +[float] +===== `certificate` + +Path to the public certificate file used for certificate-based authentication. + +[float] +===== `key` + +Path to the certificate's private key file for certificate-based authentication. + +[float] +===== `key_passphrase` + +Passphrase used to decrypt the private key. + +[float] +===== `api.authentication_endpoint` + +The authentication endpoint used to authorize the Azure app. This is +`https://login.microsoftonline.com/` by default, and can be changed to access +alternative endpoints. + +===== `api.resource` + +The API resource to retrieve information from. This is +`https://manage.office.com` by default, and can be changed to access alternative +endpoints. + +===== `api.max_retention` + +The maximum data retention period to support. `178h` by default. {beatname_uc} +will fetch all retained data for a tenant when run for the first time. + +===== `api.poll_interval` + +The interval to wait before polling the API server for new events. Default `3m`. + +===== `api.max_requests_per_minute` + +The maximum number of requests to perform per minute, for each tenant. The +default is `2000`, as this is the server-side limit per tenant. + +===== `api.max_query_size` + +The maximum time window that API allows in a single query. Defaults to `24h` +to match Microsoft's documented limit. diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index eb5861b4f83e..7970538c0c47 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/s3" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/aws" diff --git a/x-pack/filebeat/input/o365audit/auth/auth.go b/x-pack/filebeat/input/o365audit/auth/auth.go new file mode 100644 index 000000000000..69899e340311 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/auth.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +import ( + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" +) + +// TokenProvider is the interface that wraps an authentication mechanism and +// allows to obtain tokens. +type TokenProvider interface { + // Token returns a valid OAuth token, or an error. + Token() (string, error) + + // Renew must be called to re-authenticate against the oauth2 endpoint if + // when the API returns an Authentication error. + Renew() error +} + +// servicePrincipalToken extends adal.ServicePrincipalToken with the +// the TokenProvider interface. +type servicePrincipalToken adal.ServicePrincipalToken + +// Token returns an oauth token that can be used for bearer authorization. +func (provider *servicePrincipalToken) Token() (string, error) { + inner := (*adal.ServicePrincipalToken)(provider) + if err := inner.EnsureFresh(); err != nil { + return "", errors.Wrap(err, "refreshing spt token") + } + token := inner.Token() + return token.OAuthToken(), nil +} + +// Renew re-authenticates with the oauth2 endpoint to get a new Service Principal Token. +func (provider *servicePrincipalToken) Renew() error { + inner := (*adal.ServicePrincipalToken)(provider) + return inner.Refresh() +} diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go new file mode 100644 index 000000000000..dc8e1584a3a0 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -0,0 +1,66 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +import ( + "crypto/rsa" + "crypto/x509" + "fmt" + + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +// NewProviderFromCertificate returns a TokenProvider that uses certificate-based +// authentication. +func NewProviderFromCertificate( + endpoint, resource, applicationID, tenantID string, + conf tlscommon.CertificateConfig) (sptp TokenProvider, err error) { + cert, privKey, err := loadConfigCerts(conf) + if err != nil { + return nil, errors.Wrap(err, "failed loading certificates") + } + oauth, err := adal.NewOAuthConfig(endpoint, tenantID) + if err != nil { + return nil, errors.Wrap(err, "error generating OAuthConfig") + } + + spt, err := adal.NewServicePrincipalTokenFromCertificate( + *oauth, + applicationID, + cert, + privKey, + resource, + ) + if err != nil { + return nil, err + } + spt.SetAutoRefresh(true) + return (*servicePrincipalToken)(spt), nil +} + +func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, key *rsa.PrivateKey, err error) { + tlsCert, err := tlscommon.LoadCertificate(&cfg) + if err != nil { + return nil, nil, errors.Wrapf(err, "error loading X509 certificate from '%s'", cfg.Certificate) + } + if tlsCert == nil || len(tlsCert.Certificate) == 0 { + return nil, nil, fmt.Errorf("no certificates loaded from '%s'", cfg.Certificate) + } + cert, err = x509.ParseCertificate(tlsCert.Certificate[0]) + if err != nil { + return nil, nil, errors.Wrapf(err, "error parsing X509 certificate from '%s'", cfg.Certificate) + } + if tlsCert.PrivateKey == nil { + return nil, nil, fmt.Errorf("failed loading private key from '%s'", cfg.Key) + } + key, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) + if !ok { + return nil, nil, fmt.Errorf("private key at '%s' is not an RSA private key", cfg.Key) + } + return cert, key, nil +} diff --git a/x-pack/filebeat/input/o365audit/auth/secret.go b/x-pack/filebeat/input/o365audit/auth/secret.go new file mode 100644 index 000000000000..c34d6d48cc66 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/secret.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +import ( + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" +) + +// NewProviderFromClientSecret returns a token provider that uses a secret +// for authentication. +func NewProviderFromClientSecret(endpoint, resource, applicationID, tenantID, secret string) (p TokenProvider, err error) { + oauth, err := adal.NewOAuthConfig(endpoint, tenantID) + if err != nil { + return nil, errors.Wrap(err, "error generating OAuthConfig") + } + spt, err := adal.NewServicePrincipalToken(*oauth, applicationID, secret, resource) + if err != nil { + return nil, err + } + spt.SetAutoRefresh(true) + return (*servicePrincipalToken)(spt), nil +} diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go new file mode 100644 index 000000000000..f30e368a9e26 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/config.go @@ -0,0 +1,195 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/auth" +) + +// Config for the O365 audit API input. +type Config struct { + // CertificateConfig contains the authentication credentials (certificate). + CertificateConfig tlscommon.CertificateConfig `config:",inline"` + + // ApplicationID (aka. client ID) of the Azure application. + ApplicationID string `config:"application_id" validate:"required"` + + // ClientSecret (aka. API key) to use for authentication. + ClientSecret string `config:"client_secret"` + + // TenantID (aka. Directory ID) is a list of tenants for which to fetch + // the audit logs. This can be a string or a list of strings. + TenantID stringList `config:"tenant_id,replace" validate:"required"` + + // Content-Type is a list of content-types to fetch. + // This can be a string or a list of strings. + ContentType stringList `config:"content_type,replace"` + + // API contains settings to adapt to changes on the API. + API APIConfig `config:"api"` +} + +// APIConfig contains advanced settings that are only supposed to be changed +// to diagnose errors or to adapt to changes in the service. +type APIConfig struct { + + // AuthenticationEndpoint to authorize the Azure app. + AuthenticationEndpoint string `config:"authentication_endpoint"` + + // Resource to request authorization for. + Resource string `config:"resource"` + + // MaxRetention determines how far back the input will poll for events. + MaxRetention time.Duration `config:"max_retention" validate:"positive"` + + // AdjustClock controls whether the input will adapt its internal clock + // to the server's clock to compensate for clock differences when the API + // returns an error indicating that the times requests are out of bounds. + AdjustClock bool `config:"adjust_clock"` + + // AdjustClockMinDifference sets the minimum difference between clocks so + // that an adjust is considered. + AdjustClockMinDifference time.Duration `config:"adjust_clock_min_difference" validate:"positive"` + + // AdjustClockWarn controls whether a warning should be printed to the logs + // when a clock difference between the local clock and the server's clock + // is detected, as it can lead to event loss. + AdjustClockWarn bool `config:"adjust_clock_warn"` + + // ErrorRetryInterval sets the interval between retries in the case of + // errors performing a request. + ErrorRetryInterval time.Duration `config:"error_retry_interval" validate:"positive"` + + // PollInterval determines how often the input should poll for new + // data once it has finished scanning for past events and reached the live + // window. + PollInterval time.Duration `config:"poll_interval" validate:"positive"` + + // MaxRequestsPerMinute sets the limit on the number of API requests that + // can be sent, per tenant. + MaxRequestsPerMinute int `config:"max_requests_per_minute" validate:"positive"` + + // SetIDFromAuditRecord controls whether the unique "Id" field in audit + // record is used as the document id for ingestion. This helps avoiding + // duplicates. + SetIDFromAuditRecord bool `config:"set_id_from_audit_record"` + + // MaxQuerySize is the maximum time window that can be queried. The default + // is 24h. + MaxQuerySize time.Duration `config:"max_query_size" validate:"positive"` +} + +func defaultConfig() Config { + return Config{ + + // All documented content types. + ContentType: []string{ + "Audit.AzureActiveDirectory", + "Audit.Exchange", + "Audit.SharePoint", + "Audit.General", + "DLP.All", + }, + + API: APIConfig{ + // This is used to bootstrap the input for the first time + // as the API doesn't provide a way to query for the oldest record. + // Currently the API will err on queries older than this, use with care. + MaxRetention: 7 * timeDay, + + AuthenticationEndpoint: "https://login.microsoftonline.com/", + + Resource: "https://manage.office.com", + + AdjustClock: true, + + AdjustClockMinDifference: 5 * time.Minute, + + AdjustClockWarn: true, + + ErrorRetryInterval: 5 * time.Minute, + + PollInterval: 3 * time.Minute, + + MaxQuerySize: timeDay, + + // According to the docs this is the max requests that are allowed + // per tenant per minute. + MaxRequestsPerMinute: 2000, + + SetIDFromAuditRecord: true, + }, + } +} + +// Validate checks that the configuration is correct. +func (c *Config) Validate() (err error) { + hasSecret := c.ClientSecret != "" + hasCert := c.CertificateConfig.Certificate != "" + + if !hasSecret && !hasCert { + return errors.New("no authentication configured. Configure a client_secret or a certificate and key.") + } + if hasSecret && hasCert { + return errors.New("both client_secret and certificate are configured. Only one authentication method can be used.") + } + if hasCert { + if err = c.CertificateConfig.Validate(); err != nil { + return errors.Wrap(err, "invalid certificate config") + } + } + return nil +} + +type stringList []string + +// Unpack populates the stringList with either a single string value or an array. +func (s *stringList) Unpack(value interface{}) error { + switch v := value.(type) { + case string: + *s = []string{v} + case []string: + *s = v + case []interface{}: + *s = make([]string, len(v)) + for idx, ival := range v { + str, ok := ival.(string) + if !ok { + return fmt.Errorf("string value required. Found %v (type %T) at position %d", + ival, ival, idx+1) + } + (*s)[idx] = str + } + default: + return fmt.Errorf("array of strings required. Found %v (type %T)", value, value) + } + return nil +} + +// NewTokenProvider returns an auth.TokenProvider for the given tenantID. +func (c *Config) NewTokenProvider(tenantID string) (auth.TokenProvider, error) { + if c.ClientSecret != "" { + return auth.NewProviderFromClientSecret( + c.API.AuthenticationEndpoint, + c.API.Resource, + c.ApplicationID, + tenantID, + c.ClientSecret, + ) + } + return auth.NewProviderFromCertificate( + c.API.AuthenticationEndpoint, + c.API.Resource, + c.ApplicationID, + tenantID, + c.CertificateConfig, + ) +} diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go new file mode 100644 index 000000000000..44ddb911f461 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -0,0 +1,146 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +// contentBlob is a poll.Transaction that processes "content blobs": +// aggregations of audit event objects returned by the API. +type contentBlob struct { + env apiEnvironment + id, url string + // cursor is used to ACK the resulting events. + cursor cursor + // skipLines is used when resuming from a saved cursor so that already + // acknowledged objects are not duplicated. + skipLines int +} + +// String returns a printable representation of this transaction. +func (c contentBlob) String() string { + return fmt.Sprintf("content blob url:%s id:%s", c.url, c.id) +} + +// RequestDecorators returns the decorators used to perform a request. +func (c contentBlob) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(c.url), + } +} + +// Delay returns the delay to perform this request. +func (c contentBlob) Delay() time.Duration { + return 0 +} + +// OnResponse parses the response for a content blob. +func (c contentBlob) OnResponse(response *http.Response) (actions []poll.Action) { + if response.StatusCode != 200 { + return c.handleError(response) + } + var js []common.MapStr + if err := readJSONBody(response, &js); err != nil { + return append(actions, poll.Terminate(errors.Wrap(err, "reading body failed"))) + } + for idx, entry := range js { + id, _ := getString(entry, "Id") + ts, _ := getString(entry, "CreationTime") + c.env.Logger.Debugf(" > event %d: created:%s id:%s for %s", idx+1, ts, id, c.cursor) + } + if len(js) > c.skipLines { + for _, entry := range js[:c.skipLines] { + id, _ := getString(entry, "Id") + c.env.Logger.Debugf("Skipping event %s [%s] for %s", c.cursor, id, c.id) + } + for _, entry := range js[c.skipLines:] { + c.cursor = c.cursor.ForNextLine() + c.env.Logger.Debugf("Reporting event %s for %s", c.cursor, c.id) + actions = append(actions, c.env.Report(entry, c.cursor)) + } + c.skipLines = 0 + } else { + for _, entry := range js { + id, _ := getString(entry, "Id") + c.env.Logger.Debugf("Skipping event all %s [%s] for %s", c.cursor, id, c.id) + } + + c.skipLines -= len(js) + } + // The API only documents the use of NextPageUri header for list requests + // but one can't be too careful. + if url, found := getNextPage(response); found { + return append(actions, poll.Fetch(newPager(url, c))) + } + + return actions +} + +func (c contentBlob) handleError(response *http.Response) (actions []poll.Action) { + var msg apiError + readJSONBody(response, &msg) + c.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) + + if _, found := fatalErrors[msg.Error.Code]; found { + return []poll.Action{ + c.env.ReportAPIError(msg), + poll.Terminate(errors.New(msg.Error.Message)), + } + } + + switch response.StatusCode { + case 401: // Authentication error. Renew oauth token and repeat this op. + return []poll.Action{ + poll.RenewToken(), + poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.PollInterval}), + } + case 404: + return nil + } + if msg.Error.Code != "" { + actions = append(actions, c.env.ReportAPIError(msg)) + } + return append(actions, poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.ErrorRetryInterval})) +} + +// ContentBlob creates a new contentBlob. +func ContentBlob(url string, cursor cursor, env apiEnvironment) contentBlob { + return contentBlob{ + url: url, + env: env, + cursor: cursor, + } +} + +// WithID configures a content blob with the given origin ID. +func (c contentBlob) WithID(id string) contentBlob { + c.id = id + return c +} + +// WithSkipLines configures a content blob with the number of objects to skip. +func (c contentBlob) WithSkipLines(nlines int) contentBlob { + c.skipLines = nlines + return c +} + +type withDelay struct { + contentBlob + delay time.Duration +} + +// Delay overrides the contentBlob's delay. +func (w withDelay) Delay() time.Duration { + return w.delay +} diff --git a/x-pack/filebeat/input/o365audit/contentblob_test.go b/x-pack/filebeat/input/o365audit/contentblob_test.go new file mode 100644 index 000000000000..1a08c69fb362 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/contentblob_test.go @@ -0,0 +1,149 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +type contentStore struct { + events []beat.Event + stopped bool +} + +func (s *contentStore) onEvent(b beat.Event) bool { + s.events = append(s.events, b) + return !s.stopped +} + +func (f *fakePoll) BlobContent(t testing.TB, b poll.Transaction, data []common.MapStr, nextUrl string) poll.Transaction { + urls, next := f.deliverResult(t, b, data, nextUrl) + if !assert.Empty(t, urls) { + t.Fatal("blob returned urls to fetch") + } + return next +} + +func makeEvent(ts time.Time, id string) common.MapStr { + return common.MapStr{ + "CreationTime": ts.Format(apiDateFormat), + "Id": id, + } +} + +func validateBlobs(t testing.TB, store contentStore, expected []string, c cursor) cursor { + assert.Len(t, store.events, len(expected)) + for idx := range expected { + id, err := getString(store.events[idx].Fields, fieldsPrefix+".Id") + if !assert.NoError(t, err) { + t.Fatal(err) + } + assert.Equal(t, expected[idx], id) + } + prev := c + baseLine := c.line + for idx, id := range expected { + ev := store.events[idx] + cursor, ok := ev.Private.(cursor) + if !assert.True(t, ok) { + t.Fatal("no cursor for event id", id) + } + assert.Equal(t, idx+1+baseLine, cursor.line) + assert.True(t, prev.Before(cursor)) + prev = cursor + } + return prev +} + +func TestContentBlob(t *testing.T) { + var f fakePoll + var store contentStore + ctx := apiEnvironment{ + Logger: logp.L(), + Callback: store.onEvent, + } + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + query := ContentBlob("http://test.localhost/", baseCursor, ctx) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + } + expected := []string{"e1", "e2", "e3", "e4", "e5"} + next := f.BlobContent(t, query, data, "") + assert.Nil(t, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, len(expected), c.line) +} + +func TestContentBlobResumeToLine(t *testing.T) { + var f fakePoll + var store contentStore + ctx := testConfig() + ctx.Callback = store.onEvent + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + const skip = 3 + baseCursor.line = skip + query := ContentBlob("http://test.localhost/", baseCursor, ctx).WithSkipLines(skip) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + } + expected := []string{"e4", "e5"} + next := f.BlobContent(t, query, data, "") + assert.Nil(t, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, len(expected), c.line-skip) +} + +func TestContentBlobPaged(t *testing.T) { + var f fakePoll + var store contentStore + ctx := apiEnvironment{ + Logger: logp.L(), + Callback: store.onEvent, + } + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + query := ContentBlob("http://test.localhost/", baseCursor, ctx) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + makeEvent(now.Add(-20*time.Minute), "e6"), + } + expected := []string{"e1", "e2", "e3"} + next := f.BlobContent(t, query, data[:3], "http://test.localhost/page/2") + assert.NotNil(t, next) + assert.IsType(t, paginator{}, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, 3, c.line) + store.events = nil + next = f.BlobContent(t, next, data[3:5], "http://test.localhost/page/3") + assert.IsType(t, paginator{}, next) + expected = []string{"e4", "e5"} + c = validateBlobs(t, store, expected, c) + assert.Equal(t, 5, c.line) + store.events = nil + next = f.BlobContent(t, next, data[5:], "") + assert.Nil(t, next) + expected = []string{"e6"} + c = validateBlobs(t, store, expected, c) + assert.Equal(t, 6, c.line) +} diff --git a/x-pack/filebeat/input/o365audit/dates.go b/x-pack/filebeat/input/o365audit/dates.go new file mode 100644 index 000000000000..5eb53d4d6dec --- /dev/null +++ b/x-pack/filebeat/input/o365audit/dates.go @@ -0,0 +1,107 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "sort" + "time" + + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +const ( + // Date format used by audit objects. + apiDateFormat = "2006-01-02T15:04:05" + timeDay = time.Hour * 24 +) + +var ( + errTypeCastFailed = errors.New("key is not expected type") +) + +// Date formats used in the JSON objects returned by the API. +// This is just a safeguard in case the date format used by the API is +// updated to include sub-second resolution or timezone information. +var apiDateFormats = dateFormats{ + apiDateFormat, + apiDateFormat + "Z", + time.RFC3339Nano, + time.RFC3339, +} + +// Date formats used by HTTP/1.1 servers. +var httpDateFormats = dateFormats{ + time.RFC1123, + time.RFC850, + time.ANSIC, + time.RFC1123Z, +} + +// A helper to parse dates using different formats. +type dateFormats []string + +// Parse will try to parse the given string-formatted date in the formats +// specified in the dateFormats until one succeeds. +func (d dateFormats) Parse(str string) (t time.Time, err error) { + for _, fmt := range d { + if t, err = time.Parse(fmt, str); err == nil { + return t.UTC(), nil + } + } + return time.Now().UTC(), fmt.Errorf("unable to parse date '%s' with formats %v", str, d) +} + +// Get a key from a map and cast it to string. +func getString(m common.MapStr, key string) (string, error) { + iValue, err := m.GetValue(key) + if err != nil { + return "", err + } + str, ok := iValue.(string) + if !ok { + return "", errTypeCastFailed + } + return str, nil +} + +// Parse a date from the given map key. +func getDateKey(m common.MapStr, key string, formats dateFormats) (t time.Time, err error) { + str, err := getString(m, key) + if err != nil { + return t, err + } + return formats.Parse(str) +} + +// Sort a slice of maps by one of its keys parsed as a date in the given format(s). +func sortMapSliceByDate(s []common.MapStr, dateKey string, formats dateFormats) error { + var errs multierror.Errors + sort.Slice(s, func(i, j int) bool { + di, e1 := getDateKey(s[i], dateKey, formats) + dj, e2 := getDateKey(s[j], dateKey, formats) + if e1 != nil { + errs = append(errs, e1) + } + if e2 != nil { + errs = append(errs, e2) + } + return di.Before(dj) + }) + return errors.Wrapf(errs.Err(), "failed sorting by date key:%s", dateKey) +} + +func inRange(d, maxLimit time.Duration) bool { + if maxLimit < 0 { + maxLimit = -maxLimit + } + if d < 0 { + d = -d + } + return d < maxLimit +} diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go new file mode 100644 index 000000000000..cafba2184f3a --- /dev/null +++ b/x-pack/filebeat/input/o365audit/input.go @@ -0,0 +1,303 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "context" + "sync" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/common/useragent" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +const ( + inputName = "o365audit" + fieldsPrefix = inputName +) + +func init() { + if err := input.Register(inputName, NewInput); err != nil { + panic(errors.Wrapf(err, "unable to create %s input", inputName)) + } +} + +type o365input struct { + config Config + outlet channel.Outleter + storage *stateStorage + log *logp.Logger + pollers map[stream]*poll.Poller + cancel func() + ctx context.Context + wg sync.WaitGroup + runOnce sync.Once +} + +type apiEnvironment struct { + TenantID string + ContentType string + Config APIConfig + Callback func(beat.Event) bool + Logger *logp.Logger + Clock func() time.Time +} + +// NewInput creates a new o365audit input. +func NewInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (inp input.Input, err error) { + cfgwarn.Beta("The %s input is beta", inputName) + inp, err = newInput(cfg, connector, inputContext) + return inp, errors.Wrap(err, inputName) +} + +func newInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (inp input.Input, err error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrap(err, "reading config") + } + + log := logp.NewLogger(inputName) + + // TODO: Update with input v2 state. + storage := newStateStorage(noopPersister{}) + + var out channel.Outleter + out, err = connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + ACKLastEvent: func(private interface{}) { + // Errors don't have a cursor. + if cursor, ok := private.(cursor); ok { + log.Debugf("ACKed cursor %+v", cursor) + if err := storage.Save(cursor); err != nil && err != errNoUpdate { + log.Errorf("Error saving state: %v", err) + } + } + }, + }) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } + }() + + pollers := make(map[stream]*poll.Poller) + for _, tenantID := range config.TenantID { + // MaxRequestsPerMinute limitation is per tenant. + delay := time.Duration(len(config.ContentType)) * time.Minute / time.Duration(config.API.MaxRequestsPerMinute) + auth, err := config.NewTokenProvider(tenantID) + if err != nil { + return nil, err + } + if _, err = auth.Token(); err != nil { + return nil, errors.Wrapf(err, "unable to acquire authentication token for tenant:%s", tenantID) + } + for _, contentType := range config.ContentType { + key := stream{ + tenantID: tenantID, + contentType: contentType, + } + poller, err := poll.New( + poll.WithTokenProvider(auth), + poll.WithMinRequestInterval(delay), + poll.WithLogger(log.With("tenantID", tenantID, "contentType", contentType)), + poll.WithContext(ctx), + poll.WithRequestDecorator( + autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+inputName)), + autorest.WithQueryParameters(common.MapStr{ + "publisherIdentifier": tenantID, + }), + ), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create API poller") + } + pollers[key] = poller + } + } + + return &o365input{ + config: config, + outlet: out, + storage: storage, + log: log, + pollers: pollers, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Run starts the o365input. Only has effect the first time it's called. +func (inp *o365input) Run() { + inp.runOnce.Do(inp.run) +} + +func (inp *o365input) run() { + for stream, poller := range inp.pollers { + start := inp.loadLastLocation(stream) + inp.log.Infow("Start fetching events", + "cursor", start, + "tenantID", stream.tenantID, + "contentType", stream.contentType) + inp.runPoller(poller, start) + } +} + +func (inp *o365input) runPoller(poller *poll.Poller, start cursor) { + ctx := apiEnvironment{ + TenantID: start.tenantID, + ContentType: start.contentType, + Config: inp.config.API, + Callback: inp.reportEvent, + Logger: poller.Logger(), + Clock: time.Now, + } + inp.wg.Add(1) + go func() { + defer logp.Recover("panic in " + inputName + " runner.") + defer inp.wg.Done() + action := ListBlob(start, ctx) + // When resuming from a saved state, it's necessary to query for the + // same startTime that provided the last ACKed event. Otherwise there's + // the risk of observing partial blobs with different line counts, due to + // how the backend works. + if start.line > 0 { + action = action.WithStartTime(start.startTime) + } + if err := poller.Run(action); err != nil { + ctx.Logger.Errorf("API polling terminated with error: %v", err.Error()) + msg := common.MapStr{} + msg.Put("error.message", err.Error()) + msg.Put("event.kind", "pipeline_error") + event := beat.Event{ + Timestamp: time.Now(), + Fields: msg, + } + inp.reportEvent(event) + } + }() +} + +func (inp *o365input) reportEvent(event beat.Event) bool { + return inp.outlet.OnEvent(event) +} + +// Stop terminates the o365 input. +func (inp *o365input) Stop() { + inp.log.Info("Stopping input " + inputName) + defer inp.log.Info(inputName + " stopped.") + defer inp.outlet.Close() + inp.cancel() +} + +// Wait terminates the o365input and waits for all the pollers to finalize. +func (inp *o365input) Wait() { + inp.Stop() + inp.wg.Wait() +} + +func (inp *o365input) loadLastLocation(key stream) cursor { + period := inp.config.API.MaxRetention + retentionLimit := time.Now().UTC().Add(-period) + cursor, err := inp.storage.Load(key) + if err != nil { + if err == errStateNotFound { + inp.log.Infof("No saved state found. Will fetch events for the last %v.", period.String()) + } else { + inp.log.Errorw("Error loading saved state. Will fetch all retained events. "+ + "Depending on max_retention, this can cause event loss or duplication.", + "error", err, + "max_retention", period.String()) + } + cursor.timestamp = retentionLimit + } + if cursor.timestamp.Before(retentionLimit) { + inp.log.Warnw("Last update exceeds the retention limit. "+ + "Probably some events have been lost.", + "resume_since", cursor, + "retention_limit", retentionLimit, + "max_retention", period.String()) + // Due to API limitations, it's necessary to perform a query for each + // day. These avoids performing a lot of queries that will return empty + // when the input hasn't run in a long time. + cursor.timestamp = retentionLimit + } + return cursor +} + +var errTerminated = errors.New("terminated due to output closed") + +// Report returns an action that produces a beat.Event from the given object. +func (env apiEnvironment) Report(doc common.MapStr, private interface{}) poll.Action { + return func(poll.Enqueuer) error { + if !env.Callback(env.toBeatEvent(doc, private)) { + return errTerminated + } + return nil + } +} + +// ReportAPIError returns an action that produces a beat.Event from an API error. +func (env apiEnvironment) ReportAPIError(err apiError) poll.Action { + return func(poll.Enqueuer) error { + if !env.Callback(err.ToBeatEvent()) { + return errTerminated + } + return nil + } +} + +func (env apiEnvironment) toBeatEvent(doc common.MapStr, private interface{}) beat.Event { + var errs multierror.Errors + ts, err := getDateKey(doc, "CreationTime", apiDateFormats) + if err != nil { + ts = time.Now() + errs = append(errs, errors.Wrap(err, "failed parsing CreationTime")) + } + b := beat.Event{ + Timestamp: ts, + Fields: common.MapStr{ + fieldsPrefix: doc, + }, + Private: private, + } + if env.Config.SetIDFromAuditRecord { + if id, err := getString(doc, "Id"); err == nil && len(id) > 0 { + b.SetID(id) + } + } + if len(errs) > 0 { + msgs := make([]string, len(errs)) + for idx, e := range errs { + msgs[idx] = e.Error() + } + b.PutValue("error.message", msgs) + } + return b +} diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go new file mode 100644 index 000000000000..5be65a8d67d3 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -0,0 +1,297 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sort" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +// listBlob is a poll.Transaction that handles the content/"blobs" list. +type listBlob struct { + cursor cursor + startTime, endTime time.Time + delay time.Duration + env apiEnvironment +} + +// ListBlob creates a new poll.Transaction that lists content starting from +// the given cursor position. +func ListBlob(cursor cursor, env apiEnvironment) listBlob { + l := listBlob{ + cursor: cursor, + env: env, + } + return l.adjustTimes(cursor.timestamp) +} + +// WithStartTime allows to alter the startTime of a listBlob. This is necessary +// for requests that are resuming from the cursor position of an existing blob, +// as it has been observed that the server won't return the same blob, but a +// partial one, when queried with the time that this blob was created. +func (l listBlob) WithStartTime(start time.Time) listBlob { + return l.adjustTimes(start) +} + +func (l listBlob) adjustTimes(since time.Time) listBlob { + now := l.env.Clock() + // Can't query more than in the past. + fromLimit := now.Add(-l.env.Config.MaxRetention) + if since.Before(fromLimit) { + since = fromLimit + } + + to := since.Add(l.env.Config.MaxQuerySize) + // Can't query into the future. Polling for new events every interval. + var delay time.Duration + if to.After(now) { + since = now.Add(-l.env.Config.MaxQuerySize) + if since.Before(l.cursor.timestamp) { + since = l.cursor.timestamp + } + to = now + delay = l.env.Config.PollInterval + } + l.startTime = since.UTC() + l.endTime = to.UTC() + l.delay = delay + return l +} + +// Delay returns the delay before executing a transaction. +func (l listBlob) Delay() time.Duration { + return l.delay +} + +// String returns the printable representation of a listBlob. +func (l listBlob) String() string { + return fmt.Sprintf("list blobs from:%s to:%s", l.startTime, l.endTime) +} + +// RequestDecorators returns the decorators used to perform a request. +func (l listBlob) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(l.env.Config.Resource), + autorest.WithPath("api/v1.0"), + autorest.WithPath(l.cursor.tenantID), + autorest.WithPath("activity/feed/subscriptions/content"), + autorest.WithQueryParameters( + map[string]interface{}{ + "contentType": l.cursor.contentType, + "startTime": l.startTime.Format(apiDateFormat), + "endTime": l.endTime.Format(apiDateFormat), + }), + } +} + +// OnResponse handles the output of a list content request. +func (l listBlob) OnResponse(response *http.Response) (actions []poll.Action) { + if response.StatusCode != 200 { + return l.handleError(response) + } + + if delta := getServerTimeDelta(response); l.env.Config.AdjustClockWarn && !inRange(delta, l.env.Config.AdjustClockMinDifference) { + l.env.Logger.Warnf("Server clock is offset by %v: Check system clock to avoid event loss.", delta) + } + + var list []content + if err := readJSONBody(response, &list); err != nil { + return []poll.Action{ + poll.Terminate(err), + } + } + + // Sort content by creation date and then by ID. + sort.Slice(list, func(i, j int) bool { + return list[i].Created.Before(list[j].Created) || (list[i].Created == list[j].Created && list[i].ID < list[j].ID) + }) + + // Save in the cursor the startTime that was used to obtain this blobs. + // In case of resuming retrieval using that cursor, it will be necessary to + // use the same startTime to observe the same blobs. Otherwise there's the + // risk of observing partial blobs. + l.cursor = l.cursor.WithStartTime(l.startTime) + + for _, entry := range list { + // Only fetch blobs that advance the cursor. + if l.cursor.TryAdvance(entry) { + l.env.Logger.Debugf("+ fetch blob date:%v id:%s", entry.Created.UTC(), entry.ID) + actions = append(actions, poll.Fetch( + ContentBlob(entry.URI, l.cursor, l.env). + WithID(entry.ID). + WithSkipLines(l.cursor.line))) + } else { + l.env.Logger.Debugf("- skip blob date:%v id:%s", entry.Created.UTC(), entry.ID) + } + if entry.Created.Before(l.startTime) { + l.env.Logger.Errorf("! Event created before query") + } + if entry.Created.After(l.endTime) { + l.env.Logger.Errorf("! Event created after query") + } + } + // Fetch the next page if a NextPageUri header is found. + if url, found := getNextPage(response); found { + return append(actions, poll.Fetch(newPager(url, l))) + } + // Otherwise fetch the next time window. + return append(actions, poll.Fetch(l.Next())) +} + +// Next returns a listBlob that will fetch events in future. +func (l listBlob) Next() listBlob { + return l.adjustTimes(l.endTime) +} + +var fatalErrors = map[string]struct{}{ + // Missing parameter: {0}. + "AF20001": {}, + // Invalid parameter type: {0}. Expected type: {1} + "AF20002": {}, + // Expiration {0} provided is set to past date and time. + "AF20003": {}, + // The tenant ID passed in the URL ({0}) does not match the tenant ID passed in the access token ({1}). + "AF20010": {}, + // Specified tenant ID ({0}) does not exist in the system or has been deleted. + "AF20011": {}, + // Specified tenant ID ({0}) is incorrectly configured in the system. + "AF20012": {}, + // The tenant ID passed in the URL ({0}) is not a valid GUID. + "AF20013": {}, + // The specified content type is not valid. + "AF20020": {}, + // The webhook endpoint {{0}) could not be validated. {1} + "AF20021": {}, +} + +func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { + var msg apiError + readJSONBody(response, &msg) + l.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) + l.delay = l.env.Config.ErrorRetryInterval + + switch response.StatusCode { + case 401: + // Authentication error. Renew oauth token and repeat this op. + l.delay = l.env.Config.PollInterval + return []poll.Action{ + poll.RenewToken(), + poll.Fetch(l), + } + case 408, 503: + // Known errors when the backend is down. + // Repeat the request without reporting an error. + return []poll.Action{ + poll.Fetch(l), + } + } + + if _, found := fatalErrors[msg.Error.Code]; found { + return []poll.Action{ + l.env.ReportAPIError(msg), + poll.Terminate(errors.New(msg.Error.Message)), + } + } + + switch msg.Error.Code { + // AF20022: No subscription found for the specified content type + // AF20023: The subscription was disabled by [..] + case "AF20022", "AF20023": + l.delay = 0 + // Subscribe and retry + return []poll.Action{ + poll.Fetch(Subscribe(l.env)), + poll.Fetch(l), + } + // AF20030: Start time and end time must both be specified (or both omitted) and must + // be less than or equal to 24 hours apart, with the start time no more than + // 7 days in the past. + // AF20055: (Same). + case "AF20030", "AF20055": + // As of writing this, the server fails a request if it's more than + // retention_time(7d)+1h in the past. + // On the other hand, requests can be days into the future without error. + + // First check if this is caused by a request close to the max retention + // period that's been queued for hours because of server being down. + // Repeat the request with updated times. + now := l.env.Clock() + delta := now.Sub(l.startTime) + if delta > (l.env.Config.MaxRetention + 30*time.Minute) { + l.delay = l.env.Config.PollInterval + return []poll.Action{ + poll.Fetch(l.adjustTimes(l.startTime)), + } + } + + delta = getServerTimeDelta(response) + l.env.Logger.Errorf("Server is complaining about query interval. "+ + "This is usually a problem with the local clock and the server's clock "+ + "being out of sync. Time difference with server is %v.", delta) + if l.env.Config.AdjustClock && !inRange(delta, l.env.Config.AdjustClockMinDifference) { + l.env.Clock = func() time.Time { + return time.Now().Add(delta) + } + l.env.Logger.Info("Compensating for time difference") + } else { + l.env.Logger.Infow("Not adjusting for time offset.", + "api.adjust_clock", l.env.Config.AdjustClock, + "api.adjust_clock_min_difference", l.env.Config.AdjustClockMinDifference, + "difference", delta) + } + return []poll.Action{ + poll.Fetch(l.adjustTimes(l.startTime)), + } + + // Too many requests. + case "AF429": + + // Internal server error. Retry the request. + case "AF50000": + + // Invalid nextPage Input: {0}. Can be ignored. + case "AF20031": + + // AF50005-AF50006: An internal error occurred. Retry the request. + case "AF50005", "AF50006": + return append(actions, poll.Fetch(l)) + } + + if msg.Error.Code != "" { + actions = append(actions, l.env.ReportAPIError(msg)) + } + return append(actions, poll.Fetch(l)) +} + +func readJSONBody(response *http.Response, dest interface{}) error { + defer autorest.Respond(response, + autorest.ByDiscardingBody(), + autorest.ByClosing()) + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return errors.Wrap(err, "reading body failed") + } + if err = json.Unmarshal(body, dest); err != nil { + return errors.Wrap(err, "decoding json failed") + } + return nil +} + +func getServerTimeDelta(response *http.Response) time.Duration { + serverDate, err := httpDateFormats.Parse(response.Header.Get("Date")) + if err != nil { + return 0 + } + return serverDate.Sub(time.Now()) +} diff --git a/x-pack/filebeat/input/o365audit/listblobs_test.go b/x-pack/filebeat/input/o365audit/listblobs_test.go new file mode 100644 index 000000000000..148ee2273e83 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/listblobs_test.go @@ -0,0 +1,413 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +const contentType = "Audit.AzureActiveDirectory" + +var now = time.Now().UTC() + +type blob struct { + Created time.Time `json:"contentCreated"` + Expiration time.Time `json:"contentExpiration"` + Id string `json:"contentId"` + Type string `json:"contentType"` + Uri string `json:"contentUri"` +} + +func idDate(d time.Time) string { + return strings.ReplaceAll(d.Format("20060102150405.999999999"), ".", "") +} + +func makeBlob(c time.Time, path string) blob { + created := c.UTC() + id := fmt.Sprintf("%s$%s$%s$%s$emea0026", + idDate(created), + idDate(created.Add(time.Hour)), + strings.ReplaceAll(strings.ToLower(contentType), ".", "_"), + strings.ReplaceAll(contentType, ".", "_")) + return blob{ + Created: created, + Expiration: created.Add(time.Hour * 24 * 7), + Id: id, + Type: contentType, + Uri: "https://test.localhost/" + path, + } +} + +type fakePoll struct { + queue []poll.Transaction +} + +func (f *fakePoll) RenewToken() error { + return nil +} + +func (f *fakePoll) Enqueue(item poll.Transaction) error { + f.queue = append(f.queue, item) + return nil +} + +func (f *fakePoll) PagedSearchQuery(t testing.TB, lb poll.Transaction, db []blob) (urls []string, next poll.Transaction) { + const pageSize = 3 + n := len(db) + var from, to int + switch v := lb.(type) { + case listBlob: + from = 0 + case paginator: + req, err := autorest.Prepare(&http.Request{}, v.RequestDecorators()...) + if !assert.NoError(t, err) { + t.Fatal(err) + } + nextArray, ok := req.URL.Query()["nextPage"] + if !assert.True(t, ok) || len(nextArray) != 1 { + t.Fatal("nextPage param is missing in pager query") + } + from, err = strconv.Atoi(nextArray[0]) + if !assert.NoError(t, err) { + t.Fatal(err) + } + } + if to = from + pageSize; to > n { + to = n + } + result := db[from:to] + nextUrl := "" + if to < n { + nextUrl = fmt.Sprintf("http://localhost.test/something?nextPage=%d", to) + } + return f.deliverResult(t, lb, result, nextUrl) +} + +func (f *fakePoll) deliverResult(t testing.TB, pl poll.Transaction, msg interface{}, nextUrl string) (urls []string, next poll.Transaction) { + js, err := json.Marshal(msg) + if !assert.NoError(t, err) { + t.Fatal(err) + } + response := &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewReader(js)), + ContentLength: int64(len(js)), + } + if nextUrl != "" { + response.Header = http.Header{ + "NextPageUri": []string{nextUrl}, + } + } + return f.finishQuery(t, pl, response) +} + +func (f *fakePoll) SearchQuery(t testing.TB, lb listBlob, db []blob) (urls []string, next poll.Transaction) { + t.Log("Query start:", now.Sub(lb.startTime), "end:", now.Sub(lb.endTime)) + lowerBound := sort.Search(len(db), func(i int) bool { + return !db[i].Created.Before(lb.startTime) + }) + upperBound := sort.Search(len(db), func(i int) bool { + return !db[i].Created.Before(lb.endTime) + }) + result := db[lowerBound:upperBound] + return f.deliverResult(t, lb, result, "") +} + +func (f *fakePoll) finishQuery(t testing.TB, pl poll.Transaction, resp *http.Response) (urls []string, next poll.Transaction) { + for _, a := range pl.OnResponse(resp) { + if err := a(f); !assert.NoError(t, err) { + t.Fatal(err) + } + } + if n := len(f.queue); n > 0 { + urls = make([]string, n-1) + for i := 0; i < n-1; i++ { + req, err := autorest.Prepare(&http.Request{}, f.queue[i].RequestDecorators()...) + if !assert.NoError(t, err) { + t.Fatal(err) + } + urls[i] = req.URL.Path[1:] + } + next = f.queue[n-1] + } + f.queue = nil + return urls, next +} + +func (f *fakePoll) subscriptionError(t testing.TB, lb listBlob) (subscribe, listBlob) { + t.Log("Query start:", now.Sub(lb.startTime), "end:", now.Sub(lb.endTime)) + var apiErr apiError + apiErr.Error.Code = "AF20022" + apiErr.Error.Message = "No subscription found for the specified content type" + js, err := json.Marshal(apiErr) + if !assert.NoError(t, err) { + t.Fatal(err) + } + t.Log(string(js)) + resp := &http.Response{ + StatusCode: 400, + Body: ioutil.NopCloser(bytes.NewReader(js)), + } + for _, a := range lb.OnResponse(resp) { + if err := a(f); !assert.NoError(t, err) { + t.Fatal(err) + } + } + if !assert.Len(t, f.queue, 2) { + t.Fatal("need 2 actions") + } + if !assert.IsType(t, subscribe{}, f.queue[0]) { + t.Fatal("expected type not found") + } + if !assert.IsType(t, lb, f.queue[1]) { + t.Fatal("expected type not found") + } + return f.queue[0].(subscribe), f.queue[1].(listBlob) +} + +func testConfig() apiEnvironment { + logp.TestingSetup() + config := defaultConfig() + return apiEnvironment{ + Config: config.API, + Logger: logp.NewLogger(inputName + " test"), + Clock: func() time.Time { + return now + }, + } +} + +func TestListBlob(t *testing.T) { + ctx := testConfig() + + db := []blob{ + // 7d+ ago + makeBlob(now.Add(-time.Hour*(1+24*7)), "expired"), + // [7,6d) ago + makeBlob(now.Add(-time.Hour*(8+24*6)), "day1_1"), + makeBlob(now.Add(-time.Hour*(3+24*6)), "day1_2"), + // [6d,5d) ago + makeBlob(now.Add(-time.Hour*(3+24*5)), "day2_1"), + + // [5d-4d) ago + makeBlob(now.Add(-time.Hour*(24*5)), "day3_1_limit"), + makeBlob(now.Add(-time.Hour*(23+24*4)), "day3_2"), + // Yesterday + makeBlob(now.Add(-time.Hour*(12+24*1)), "day6"), + // Today + makeBlob(now.Add(-time.Hour*12), "today_1"), + makeBlob(now.Add(-time.Hour*7), "today_2"), + } + lb := ListBlob(newCursor(stream{"1234", contentType}, time.Time{}), ctx) + var f fakePoll + // 6 days ago + blobs, next := f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"day1_1", "day1_2"}, blobs) + assert.IsType(t, listBlob{}, next) + // 5 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day2_1"}, blobs) + + // 4 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day3_1_limit", "day3_2"}, blobs) + + // 3 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // 2 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Yesterday + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day6"}, blobs) + + // Today + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"today_1", "today_2"}, blobs) + + // Query for new data + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // New blob + db = append(db, makeBlob(now.Add(-time.Hour*5), "live_1")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_1"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Two new blobs + db = append(db, makeBlob(now.Add(-time.Hour*5+time.Second), "live_2")) + db = append(db, makeBlob(now.Add(-time.Hour*5+2*time.Second), "live_3")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_2", "live_3"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Two more blobs with the same timestamp. + // I don't even know if this is possible, but assuming that in this case + // they will have a different ID because the ID uses the timestamp up to a + // nanosecond precision while the date only has millisecond-precision. + db = append(db, makeBlob(now.Add(-time.Hour*3+time.Nanosecond), "live_4a")) + db = append(db, makeBlob(now.Add(-time.Hour*3+2*time.Nanosecond), "live_4b")) + db = append(db, makeBlob(now.Add(-time.Hour*3+3*time.Nanosecond), "live_4c")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_4a", "live_4b", "live_4c"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) +} + +func TestSubscriptionStart(t *testing.T) { + logp.TestingSetup() + log := logp.L() + ctx := apiEnvironment{ + ContentType: contentType, + TenantID: "1234", + Logger: log, + Clock: func() time.Time { + return now + }, + } + lb := ListBlob(newCursor(stream{"1234", contentType}, time.Time{}), ctx) + var f fakePoll + s, l := f.subscriptionError(t, lb) + assert.Equal(t, lb.cursor, l.cursor) + assert.Equal(t, lb.endTime, l.endTime) + assert.Equal(t, lb.startTime, l.startTime) + assert.Equal(t, lb.delay, l.delay) + assert.Equal(t, lb.cursor, l.cursor) + assert.Equal(t, lb.env.TenantID, l.env.TenantID) + assert.Equal(t, lb.env.ContentType, l.env.ContentType) + assert.Equal(t, lb.env.Logger, l.env.Logger) + assert.Equal(t, contentType, s.ContentType) + assert.Equal(t, lb.cursor.tenantID, s.TenantID) +} + +func TestPagination(t *testing.T) { + ctx := testConfig() + db := []blob{ + makeBlob(now.Add(-time.Hour*47+1*time.Nanosecond), "e1"), + makeBlob(now.Add(-time.Hour*47+2*time.Nanosecond), "e2"), + makeBlob(now.Add(-time.Hour*47+3*time.Nanosecond), "e3"), + makeBlob(now.Add(-time.Hour*47+4*time.Nanosecond), "e4"), + makeBlob(now.Add(-time.Hour*47+5*time.Nanosecond), "e5"), + makeBlob(now.Add(-time.Hour*47+6*time.Nanosecond), "e6"), + makeBlob(now.Add(-time.Hour*47+7*time.Nanosecond), "e7"), + makeBlob(now.Add(-time.Hour*47+8*time.Nanosecond), "e8"), + } + lb := ListBlob(newCursor(stream{"1234", contentType}, now.Add(-time.Hour*48)), ctx) + var f fakePoll + // 6 days ago + blobs, next := f.PagedSearchQuery(t, lb, db) + assert.Equal(t, []string{"e1", "e2", "e3"}, blobs) + assert.IsType(t, paginator{}, next) + + blobs, next = f.PagedSearchQuery(t, next, db) + assert.Equal(t, []string{"e4", "e5", "e6"}, blobs) + assert.IsType(t, paginator{}, next) + + blobs, next = f.PagedSearchQuery(t, next, db) + assert.Equal(t, []string{"e7", "e8"}, blobs) + nextlb, ok := next.(listBlob) + if !assert.True(t, ok) { + t.Fatal("bad type after pagination") + } + assert.Equal(t, lb.endTime, nextlb.startTime) + assert.True(t, lb.endTime.Before(nextlb.endTime)) +} + +func mkTime(t testing.TB, str string) time.Time { + tm, err := time.Parse(apiDateFormat, str) + if !assert.NoError(t, err) { + t.Fatal(err) + } + return tm +} + +func TestAdvance(t *testing.T) { + start := mkTime(t, "2020-02-01T15:00:00") + ev1 := mkTime(t, "2020-02-02T12:00:00") + now1 := mkTime(t, "2020-02-03T00:00:00") + ev2 := mkTime(t, "2020-02-03T12:00:00") + now2 := mkTime(t, "2020-02-04T00:00:00") + now3 := mkTime(t, "2020-02-06T00:00:00") + db := []blob{ + makeBlob(ev1, "e1"), + makeBlob(ev2, "e2"), + } + now := &now1 + ctx := testConfig() + ctx.Clock = func() time.Time { + return *now + } + lb := ListBlob(newCursor(stream{"tenant", contentType}, start), ctx) + assert.Equal(t, start, lb.startTime) + assert.Equal(t, start.Add(time.Hour*24), lb.endTime) + assert.True(t, lb.endTime.Before(now1)) + var f fakePoll + blobs, next := f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"e1"}, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, ev1, lb.startTime) + assert.Equal(t, now1, lb.endTime) + + now = &now2 + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now1, lb.startTime) + assert.Equal(t, now2, lb.endTime) + + blobs, next = f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"e2"}, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, ev1.Add(time.Hour*24), lb.startTime) + assert.Equal(t, now2, lb.endTime) + + now = &now3 + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now2, lb.startTime) + assert.Equal(t, now2.Add(time.Hour*24), lb.endTime) + + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now2.Add(time.Hour*24), lb.startTime) + assert.Equal(t, now3, lb.endTime) +} diff --git a/x-pack/filebeat/input/o365audit/pagination.go b/x-pack/filebeat/input/o365audit/pagination.go new file mode 100644 index 000000000000..10703a0479ab --- /dev/null +++ b/x-pack/filebeat/input/o365audit/pagination.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +// paginator is a decorator around a poll.Transaction to parse paginated requests. +type paginator struct { + url string + inner poll.Transaction +} + +// String returns the printable representation of this transaction. +func (p paginator) String() string { + return fmt.Sprintf("pager for url:`%s` inner:%s", p.url, p.inner) +} + +// RequestDecorators returns the decorators used to perform a request. +func (p paginator) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(p.url), + } +} + +// OnResponse parses the response using the wrapped transaction. +func (p paginator) OnResponse(r *http.Response) []poll.Action { + return p.inner.OnResponse(r) +} + +// Delay returns the delay for the wrapped transaction. +func (p paginator) Delay() time.Duration { + return p.inner.Delay() +} + +func newPager(pageUrl string, inner poll.Transaction) poll.Transaction { + return paginator{ + url: pageUrl, + inner: inner, + } +} + +// The documentation mentions NextPageUri, but shows NetPageUrl in the examples. +var nextPageHeaders = []string{ + "NextPageUri", + "NextPageUrl", +} + +func getNextPage(response *http.Response) (url string, found bool) { + for _, h := range nextPageHeaders { + if urls, found := response.Header[h]; found && len(urls) > 0 { + return urls[0], true + } + } + return "", false +} diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go new file mode 100644 index 000000000000..e68f0f54c8f8 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -0,0 +1,268 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package poll + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/auth" +) + +// Transaction is the interface that wraps a request-response transaction to be +// performed by the poller. +type Transaction interface { + fmt.Stringer + + // RequestDecorators must return the list of decorators used to customize + // an http.Request. + RequestDecorators() []autorest.PrepareDecorator + + // OnResponse receives the resulting http.Response and returns the actions + // to be performed. + OnResponse(*http.Response) []Action + + // Delay returns the required delay before performing the request. + Delay() time.Duration +} + +// Poller encapsulates a single-threaded polling loop that performs requests +// and executes actions in response. +type Poller struct { + decorators []autorest.PrepareDecorator // Fixed decorators to apply to each request. + log *logp.Logger + tp auth.TokenProvider + list transactionList // List of pending transactions. + interval time.Duration // Minimum interval between transactions. + ctx context.Context +} + +// New creates a new Poller. +func New(options ...PollerOption) (p *Poller, err error) { + p = &Poller{ + ctx: context.Background(), + } + for _, opt := range options { + if err = opt(p); err != nil { + return nil, err + } + } + return p, nil +} + +// Run starts the poll loop with the given first transaction and continuing with +// any transactions spawned by it. It will execute until an error, a Terminate +// action is returned by a transaction, it runs out of transactions to perform, +// or a context set using WithContext() is done. +func (r *Poller) Run(item Transaction) error { + r.list.push(item) + for r.ctx.Err() == nil { + transaction := r.list.pop() + if transaction == nil { + return nil + } + if err := r.fetch(transaction); err != nil { + return err + } + } + return nil +} +func (r *Poller) fetch(item Transaction) error { + return r.fetchWithDelay(item, r.interval) +} + +func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error { + r.log.Debugf("* Fetch %s", item) + // The order here is important. item's decorators must come first as those + // set the URL, which is required by other decorators (WithQueryParameters). + decorators := append( + append([]autorest.PrepareDecorator{}, item.RequestDecorators()...), + r.decorators...) + if r.tp != nil { + token, err := r.tp.Token() + if err != nil { + return errors.Wrap(err, "failed getting a token") + } + decorators = append(decorators, autorest.WithBearerAuthorization(token)) + } + + request, err := autorest.Prepare(&http.Request{}, decorators...) + if err != nil { + return errors.Wrap(err, "failed preparing request") + } + delay := max(item.Delay(), minDelay) + r.log.Debugf(" -- wait %s for %s", delay, request.URL.String()) + + response, err := autorest.Send(request, + autorest.DoCloseIfError(), + autorest.AfterDelay(delay)) + if err != nil { + r.log.Warnf("-- error sending request: %v", err) + return r.fetchWithDelay(item, max(time.Minute, r.interval)) + } + + acts := item.OnResponse(response) + r.log.Debugf(" <- Result (%s) #acts=%d", response.Status, len(acts)) + + for _, act := range acts { + if err = act(r); err != nil { + return errors.Wrapf(err, "error acting on %+v", act) + } + } + + return nil +} + +// Logger returns the logger used. +func (p *Poller) Logger() *logp.Logger { + return p.log +} + +// PollerOption is the type for additional configuration options for a Poller. +type PollerOption func(r *Poller) error + +// WithRequestDecorator sets additional request decorators that will be applied +// to all requests. +func WithRequestDecorator(decorators ...autorest.PrepareDecorator) PollerOption { + return func(r *Poller) error { + r.decorators = append(r.decorators, decorators...) + return nil + } +} + +// WithTokenProvider sets the token provider that will be used to set a bearer +// token to all requests. +func WithTokenProvider(tp auth.TokenProvider) PollerOption { + return func(r *Poller) error { + if r.tp != nil { + return errors.New("tried to set more than one token provider") + } + r.tp = tp + return nil + } +} + +// WithLogger sets the logger to use. +func WithLogger(logger *logp.Logger) PollerOption { + return func(r *Poller) error { + r.log = logger + return nil + } +} + +// WithContext sets the context used to terminate the poll loop. +func WithContext(ctx context.Context) PollerOption { + return func(r *Poller) error { + r.ctx = ctx + return nil + } +} + +// WithMinRequestInterval sets the minimum delay between requests. +func WithMinRequestInterval(d time.Duration) PollerOption { + return func(r *Poller) error { + r.interval = d + return nil + } +} + +type listItem struct { + item Transaction + next *listItem +} + +type transactionList struct { + head *listItem + tail *listItem + size uint +} + +func (p *transactionList) push(item Transaction) { + li := &listItem{ + item: item, + } + if p.head != nil { + p.tail.next = li + } else { + p.head = li + } + p.tail = li + p.size++ +} + +func (p *transactionList) pop() Transaction { + item := p.head + if item == nil { + return nil + } + p.head = item.next + if p.head == nil { + p.tail = nil + } + p.size-- + return item.item +} + +// Enqueuer is the interface provided to actions so they can act on a Poller. +type Enqueuer interface { + Enqueue(item Transaction) error + RenewToken() error +} + +// Action is an operation returned by a transaction. +type Action func(q Enqueuer) error + +// Enqueue adds a new transaction to the queue. +func (r *Poller) Enqueue(item Transaction) error { + r.list.push(item) + return nil +} + +// RenewToken renews the token provider's master token in the case of an +// authorization error. +func (r *Poller) RenewToken() error { + if r.tp == nil { + return errors.New("can't renew token: no token provider set") + } + return r.tp.Renew() +} + +// Terminate action causes the poll loop to finish with the given error. +func Terminate(err error) Action { + return func(Enqueuer) error { + if err == nil { + return errors.New("polling terminated without a specific error") + } + return errors.Wrap(err, "polling terminated due to error") + } +} + +// Fetch action will add an element to the transaction queue. +func Fetch(item Transaction) Action { + return func(q Enqueuer) error { + return q.Enqueue(item) + } +} + +// RenewToken will renew the token provider's master token in the case of an +// authorization error. +func RenewToken() Action { + return func(q Enqueuer) error { + return q.RenewToken() + } +} + +func max(a, b time.Duration) time.Duration { + if a < b { + return b + } + return a +} diff --git a/x-pack/filebeat/input/o365audit/schema.go b/x-pack/filebeat/input/o365audit/schema.go new file mode 100644 index 000000000000..77519a8e9539 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/schema.go @@ -0,0 +1,66 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +type apiError struct { + Error struct { + Code string `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + +func (e apiError) getErrorStrings() (code, msg string) { + const none = "(none)" + code, msg = e.Error.Code, e.Error.Message + if len(code) == 0 { + code = none + } + if len(msg) == 0 { + msg = none + } + return +} + +func (e apiError) String() string { + code, msg := e.getErrorStrings() + return fmt.Sprintf("api error:%s %s", code, msg) +} + +// ToBeatEvent returns a beat.Event representing the API error. +func (e apiError) ToBeatEvent() beat.Event { + code, msg := e.getErrorStrings() + return beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "error": common.MapStr{ + "code": code, + "message": msg, + }, + "event": common.MapStr{ + "kind": "pipeline_error", + }, + }, + } +} + +type content struct { + Type string `json:"contentType"` + ID string `json:"contentId"` + URI string `json:"contentUri"` + Created time.Time `json:"contentCreated"` + Expiration time.Time `json:"contentExpiration"` +} + +type subscribeResponse struct { + Status string `json:"status"` +} diff --git a/x-pack/filebeat/input/o365audit/state.go b/x-pack/filebeat/input/o365audit/state.go new file mode 100644 index 000000000000..ecdb8fc89ff9 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/state.go @@ -0,0 +1,158 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "errors" + "fmt" + "sync" + "time" +) + +var errNoUpdate = errors.New("new cursor doesn't preceed the existing cursor") + +// Stream represents an event stream. +type stream struct { + tenantID, contentType string +} + +// A cursor represents a point in time within an event stream +// that can be persisted and used to resume processing from that point. +type cursor struct { + // Identifier for the event stream. + stream + + // createdTime for the last seen blob. + timestamp time.Time + // index of object count (1...n) within a blob. + line int + // startTime used in the last list content query. + // This is necessary to ensure that the same blobs are observed. + startTime time.Time +} + +// Create a new cursor. +func newCursor(s stream, time time.Time) cursor { + return cursor{ + stream: s, + timestamp: time, + } +} + +// TryAdvance advances the cursor to the given content blob +// if it's not in the past. +// Returns whether the given content needs to be processed. +func (c *cursor) TryAdvance(ct content) bool { + if ct.Created.Before(c.timestamp) { + return false + } + if ct.Created.Equal(c.timestamp) { + // Only need to re-process the current content blob if we're + // seeking to a line inside it. + return c.line > 0 + } + c.timestamp = ct.Created + c.line = 0 + return true +} + +// Before allows to compare cursors to see if the new cursor needs to be persisted. +func (c cursor) Before(b cursor) bool { + if c.contentType != b.contentType || c.tenantID != b.tenantID { + panic(fmt.Sprintf("assertion failed: %+v vs %+v", c, b)) + } + + if c.timestamp.Before(b.timestamp) { + return true + } + if c.timestamp.Equal(b.timestamp) { + return c.line < b.line + } + return false +} + +// WithStartTime allows to create a cursor with an updated startTime. +func (c cursor) WithStartTime(s time.Time) cursor { + c.startTime = s + return c +} + +// ForNextLine returns a new cursor for the next line within a blob. +func (c cursor) ForNextLine() cursor { + c.line++ + return c +} + +// String returns the printable representation of a cursor. +func (c cursor) String() string { + return fmt.Sprintf("cursor{tenantID:%s contentType:%s timestamp:%s line:%d start:%s}", + c.tenantID, c.contentType, c.timestamp, c.line, c.startTime) +} + +// ErrStateNotFound is the error returned by a statePersister when a cursor +// is not found for a stream. +var errStateNotFound = errors.New("no saved state found") + +type statePersister interface { + Load(key stream) (cursor, error) + Save(cursor cursor) error +} + +type stateStorage struct { + sync.Mutex + saved map[stream]cursor + persister statePersister +} + +func (s *stateStorage) Load(key stream) (cursor, error) { + s.Lock() + defer s.Unlock() + if st, found := s.saved[key]; found { + return st, nil + } + cur, err := s.persister.Load(key) + if err != nil { + if err != errStateNotFound { + return cur, err + } + cur = newCursor(key, time.Time{}) + } + return cur, s.saveUnsafe(cur) +} + +func (s *stateStorage) Save(c cursor) error { + s.Lock() + defer s.Unlock() + return s.saveUnsafe(c) +} + +func (s *stateStorage) saveUnsafe(c cursor) error { + if prev, found := s.saved[c.stream]; found { + if !prev.Before(c) { + return errNoUpdate + } + } + if s.saved == nil { + s.saved = make(map[stream]cursor) + } + s.saved[c.stream] = c + return s.persister.Save(c) +} + +func newStateStorage(underlying statePersister) *stateStorage { + return &stateStorage{ + persister: underlying, + } +} + +type noopPersister struct{} + +func (p noopPersister) Load(key stream) (cursor, error) { + return cursor{}, errStateNotFound +} + +func (p noopPersister) Save(cursor cursor) error { + return nil +} diff --git a/x-pack/filebeat/input/o365audit/state_test.go b/x-pack/filebeat/input/o365audit/state_test.go new file mode 100644 index 000000000000..71b778d16ecb --- /dev/null +++ b/x-pack/filebeat/input/o365audit/state_test.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNoopState(t *testing.T) { + const ( + ct = "content-type" + tn = "my_tenant" + ) + myStream := stream{tn, ct} + t.Run("new state", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + empty := newCursor(myStream, time.Time{}) + assert.Equal(t, empty, cur) + }) + t.Run("update state", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + advanced := cur.TryAdvance(content{ + Type: tn, + ID: "1234", + URI: "http://localhost.test/my_uri", + Created: time.Now(), + Expiration: time.Now().Add(time.Hour), + }) + assert.True(t, advanced) + err = st.Save(cur) + if !assert.NoError(t, err) { + t.Fatal(err) + } + saved, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + }) + t.Run("forbid reversal", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur := newCursor(myStream, time.Now()) + next := cur.ForNextLine() + err := st.Save(next) + if !assert.NoError(t, err) { + t.Fatal(err) + } + err = st.Save(cur) + assert.Equal(t, errNoUpdate, err) + }) + t.Run("multiple contexts", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cursors := []cursor{ + newCursor(myStream, time.Time{}), + newCursor(stream{"tenant2", ct}, time.Time{}), + newCursor(stream{ct, "bananas"}, time.Time{}), + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + err := st.Save(cur) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + saved, err := st.Load(cur.stream) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + } + for idx, cur := range cursors { + cur = cur.ForNextLine() + cursors[idx] = cur + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + err := st.Save(cur) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + saved, err := st.Load(cur.stream) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + } + }) +} diff --git a/x-pack/filebeat/input/o365audit/subscribe.go b/x-pack/filebeat/input/o365audit/subscribe.go new file mode 100644 index 000000000000..8077ea246221 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/subscribe.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" +) + +// Subscribe is a poll.Transaction that subscribes to an event stream. +type subscribe struct { + apiEnvironment +} + +// String returns the printable representation of a subscribe transaction. +func (s subscribe) String() string { + return fmt.Sprintf("subscribe tenant:%s contentType:%s", s.TenantID, s.ContentType) +} + +// RequestDecorators returns the decorators used to perform a request. +func (s subscribe) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.AsPost(), + autorest.WithBaseURL(s.Config.Resource), + autorest.WithPath("api/v1.0"), + autorest.WithPath(s.TenantID), + autorest.WithPath("activity/feed/subscriptions/start"), + autorest.WithQueryParameters( + map[string]interface{}{ + "contentType": s.ContentType, + }), + } +} + +// OnResponse handles the output of a list content request. +func (s subscribe) OnResponse(response *http.Response) []poll.Action { + if response.StatusCode != 200 { + return s.handleError(response) + } + var js subscribeResponse + if err := readJSONBody(response, &js); err != nil { + return []poll.Action{ + poll.Terminate(err), + } + } + if js.Status != "enabled" { + return []poll.Action{ + poll.Terminate(fmt.Errorf("unable to subscribe. Got status: %s", js.Status)), + } + } + return nil +} + +func (s subscribe) handleError(response *http.Response) []poll.Action { + var msg apiError + if err := readJSONBody(response, &msg); err != nil { + return []poll.Action{poll.Terminate(err)} + } + return []poll.Action{ + poll.Terminate(fmt.Errorf("got an error when subscribing: %s body: %+v", response.Status, msg)), + } +} + +// Delay returns the delay before executing a transaction. +func (s subscribe) Delay() time.Duration { + return time.Second * 5 +} + +// Subscribe returns an action to subscribe to a stream. +func Subscribe(env apiEnvironment) subscribe { + return subscribe{ + apiEnvironment: env, + } +} From 6cb711b8ab98dd55fb985a565ad2d84b48979621 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 19 Mar 2020 21:44:31 +0100 Subject: [PATCH 2/3] Fix user-agent call --- x-pack/filebeat/input/o365audit/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index cafba2184f3a..e8a64f7c7dda 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -129,7 +129,7 @@ func newInput( poll.WithLogger(log.With("tenantID", tenantID, "contentType", contentType)), poll.WithContext(ctx), poll.WithRequestDecorator( - autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+inputName)), + autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+inputName, true)), autorest.WithQueryParameters(common.MapStr{ "publisherIdentifier": tenantID, }), From 08bdaac49089181b05607b7501fdd55559bd51c6 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 19 Mar 2020 23:27:54 +0100 Subject: [PATCH 3/3] fix changelog --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a60d4459bfaa..cfc5668e64b0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -263,7 +263,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Release ActiveMQ module as GA. {issue}17047[17047] {pull}17049[17049] - Improve ECS categorization field mappings in iptables module. {issue}16166[16166] {pull}16637[16637] - Add pattern for Cisco ASA / FTD Message 734001 {issue}16212[16212] {pull}16612[16612] -- Allow users to override pipeline ID in fileset input config. {issue}9531[9531] {pull}16561[16561] - Add `o365audit` input type for consuming events from Office 365 Management Activity API. {issue}16196[16196] {pull}16244[16244] *Heartbeat*