diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cb2bbfe..2445e13 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,4 +17,4 @@ jobs: go-version-file: 'go.mod' - name: Test - run: make test-integration GOTEST_FLAGS="-v -count=1" + run: make test GOTEST_FLAGS="-v -count=1" diff --git a/Makefile b/Makefile index 917b46f..8d89d3b 100644 --- a/Makefile +++ b/Makefile @@ -8,13 +8,6 @@ build: test: go test $(GOTEST_FLAGS) -race ./... -test-integration: - # run required docker containers, execute integration tests, stop containers after tests - docker compose -f test/docker-compose.yml up -d - go test $(GOTEST_FLAGS) -v -race ./...; ret=$$?; \ - docker compose -f test/docker-compose.yml down; \ - exit $$ret - generate: go generate ./... diff --git a/README.md b/README.md index fec5bc6..e298edc 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,44 @@ # Conduit Connector for -[Conduit](https://conduit.io) for . +The HTTP connector is a [Conduit](https://github.com/ConduitIO/conduit) plugin. It provides both, a source +and a destination HTTP connectors. ## How to build? -Run `make build` to build the connector. +Run `make build` to build the connector's binary. ## Testing -Run `make test` to run all the unit tests. Run `make test-integration` to run the integration tests. - -The Docker compose file at `test/docker-compose.yml` can be used to run the required resource locally. +Run `make test` to run all the unit tests. ## Source -A source connector pulls data from an external resource and pushes it to downstream resources via Conduit. +The HTTP source connector pulls data from the HTTP URL every `pollingPeriod`, the source adds the `params` and `headers` +to the request, and sends it to the URL with the specified `method` from the `Configuration`. The returned data is +used to create an openCDC record and return it. + +Note: when using the `OPTIONS` method, the resulted options will be added to the record's metadata. ### Configuration -| name | description | required | default value | -|-----------------------|---------------------------------------|----------|---------------| -| `source_config_param` | Description of `source_config_param`. | true | 1000 | +| name | description | required | default value | +|-----------------|-------------------------------------------------------------------------------------|----------|---------------| +| `url` | Http URL to send requests to. | true | | +| `method` | Http method to use in the request, supported methods are (`GET`,`HEAD`,`OPTIONS`). | false | `GET` | +| `headers` | Http headers to use in the request, comma separated list of `:` separated pairs. | false | | +| `params` | parameters to use in the request, comma separated list of `:` separated pairs. | false | | +| `pollingperiod` | how often the connector will get data from the url, formatted as a `time.Duration`. | false | "5m" | ## Destination -A destination connector pushes data from upstream resources to an external resource via Conduit. +The HTTP destination connector pushes data from upstream resources to an HTTP URL via Conduit. the destination adds the +`params` and `headers` to the request, and sends it to the URL with the specified `method` from the `Configuration`. -### Configuration +Note: The request `Body` that will be sent is the value under `record.Payload.After`, if you want to change the format +of that or manipulate the field in any way, please check our [Builtin Processors Docs](https://conduit.io/docs/processors/builtin/) +, or check [Standalone Processors Docs](https://conduit.io/docs/processors/standalone/) if you'd like to build your own processor . -| name | description | required | default value | -|----------------------------|--------------------------------------------|----------|---------------| -| `destination_config_param` | Description of `destination_config_param`. | true | 1000 | +### Configuration -## Known Issues & Limitations -* Known issue A -* Limitation A +| name | description | required | default value | +|-----------|-------------------------------------------------------------------------------------------|------------|---------------| +| `url` | Http URL to send requests to. | true | | +| `method` | Http method to use in the request, supported methods are (`POST`,`PUT`,`DELETE`,`PATCH`). | false | `POST` | +| `headers` | Http headers to use in the request, comma separated list of : separated pairs. | false | | +| `params` | parameters to use in the request, comma separated list of : separated pairs. | false | | -## Planned work -- [ ] Item A -- [ ] Item B diff --git a/config.go b/config.go new file mode 100644 index 0000000..f1f50d6 --- /dev/null +++ b/config.go @@ -0,0 +1,75 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "fmt" + "net/http" + "net/url" + "strings" +) + +type Config struct { + // Http url to send requests to + URL string `json:"url" validate:"required"` + // Http headers to use in the request, comma separated list of : separated pairs + Headers []string + // parameters to use in the request, comma separated list of : separated pairs + Params []string +} + +func (s Config) addParamsToURL() (string, error) { + parsedURL, err := url.Parse(s.URL) + if err != nil { + return s.URL, fmt.Errorf("error parsing URL: %w", err) + } + // Parse existing query parameters + existingParams := parsedURL.Query() + for _, param := range s.Params { + keyValue := strings.Split(param, ":") + if len(keyValue) != 2 { + return s.URL, fmt.Errorf("invalid %q format", "params") + } + key := keyValue[0] + value := keyValue[1] + existingParams.Add(key, value) + } + // Update query parameters in the URL struct + parsedURL.RawQuery = existingParams.Encode() + + return parsedURL.String(), nil +} + +func (s Config) getHeader() (http.Header, error) { + // create a new empty header + header := http.Header{} + + // iterate over the pairs and add them to the header + for _, pair := range s.Headers { + // split each pair into key and value + parts := strings.SplitN(strings.TrimSpace(pair), ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid headers value: %s", pair) + } + + // trim any spaces from the key and value + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + + // Add to header + header.Add(key, value) + } + return header, nil +} diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..7ade34d --- /dev/null +++ b/config_test.go @@ -0,0 +1,73 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "net/http" + "testing" + + "github.com/matryer/is" +) + +func TestConfig_URL(t *testing.T) { + is := is.New(t) + config := Config{ + URL: "http://localhost:8082/resource", + Params: []string{"name:resource1", "id:1"}, + } + want := "http://localhost:8082/resource?id=1&name=resource1" + got, _ := config.addParamsToURL() + is.True(got == want) +} + +func TestConfig_URLParams(t *testing.T) { + is := is.New(t) + config := Config{ + // url already has a parameter + URL: "http://localhost:8082/resource?name=resource1", + Params: []string{"id:1"}, + } + want := "http://localhost:8082/resource?id=1&name=resource1" + got, err := config.addParamsToURL() + is.NoErr(err) + is.True(got == want) +} + +func TestConfig_EmptyParams(t *testing.T) { + is := is.New(t) + config := Config{ + URL: "http://localhost:8082/resource?", + Params: []string{"name:resource1", "id:1"}, + } + want := "http://localhost:8082/resource?id=1&name=resource1" + got, err := config.addParamsToURL() + is.NoErr(err) + is.True(got == want) +} + +func TestConfig_Headers(t *testing.T) { + is := is.New(t) + config := Config{ + URL: "http://localhost:8082/resource", + Headers: []string{"header1:val1", "header2:val2"}, + } + want := http.Header{} + want.Add("header1", "val1") + want.Add("header2", "val2") + got, err := config.getHeader() + is.NoErr(err) + is.True(got.Get("header1") == want.Get("header1")) + is.True(got.Get("header2") == want.Get("header2")) +} diff --git a/connector.go b/connector.go index 1ae8eb8..812d918 100644 --- a/connector.go +++ b/connector.go @@ -20,5 +20,5 @@ import sdk "github.com/conduitio/conduit-connector-sdk" var Connector = sdk.Connector{ NewSpecification: Specification, NewSource: NewSource, - NewDestination: nil, + NewDestination: NewDestination, } diff --git a/destination.go b/destination.go new file mode 100644 index 0000000..7e7c2d6 --- /dev/null +++ b/destination.go @@ -0,0 +1,139 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +//go:generate paramgen -output=paramgen_dest.go DestinationConfig + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + + sdk "github.com/conduitio/conduit-connector-sdk" +) + +type Destination struct { + sdk.UnimplementedDestination + + config DestinationConfig + client *http.Client + header http.Header +} + +type DestinationConfig struct { + Config + + // Http method to use in the request + Method string `default:"POST" validate:"inclusion=POST|PUT|DELETE|PATCH"` +} + +func NewDestination() sdk.Destination { + return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) +} + +func (d *Destination) Parameters() map[string]sdk.Parameter { + return d.config.Parameters() +} + +func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { + sdk.Logger(ctx).Info().Msg("Configuring Destination...") + var config DestinationConfig + err := sdk.Util.ParseConfig(cfg, &config) + if err != nil { + return fmt.Errorf("invalid config: %w", err) + } + + d.config.URL, err = d.config.addParamsToURL() + if err != nil { + return err + } + d.header, err = config.Config.getHeader() + if err != nil { + return fmt.Errorf("invalid header config: %w", err) + } + d.config = config + return nil +} + +func (d *Destination) Open(ctx context.Context) error { + // create client + d.client = &http.Client{} + + // check connection + req, err := http.NewRequestWithContext(ctx, http.MethodHead, d.config.URL, nil) + if err != nil { + return fmt.Errorf("error creating HTTP request %q: %w", d.config.URL, err) + } + req.Header = d.header + resp, err := d.client.Do(req) + if err != nil { + return fmt.Errorf("error pinging URL %q: %w", d.config.URL, err) + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusUnauthorized { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + return fmt.Errorf("authorization failed, %s: %s", http.StatusText(http.StatusUnauthorized), string(body)) + } + + return nil +} + +func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { + for i, rec := range records { + err := d.sendRequest(ctx, rec) + if err != nil { + return i, err + } + } + return 0, nil +} + +func (d *Destination) sendRequest(ctx context.Context, record sdk.Record) error { + var body io.Reader + if record.Payload.After != nil { + body = bytes.NewReader(record.Payload.After.Bytes()) + } + + // create request + req, err := http.NewRequestWithContext(ctx, d.config.Method, d.config.URL, body) + if err != nil { + return fmt.Errorf("error creating HTTP %s request: %w", d.config.Method, err) + } + req.Header = d.header + + // get response + resp, err := d.client.Do(req) + if err != nil { + return fmt.Errorf("error getting data from URL: %w", err) + } + defer resp.Body.Close() + // check if response status is an error code + if resp.StatusCode >= 400 { + return fmt.Errorf("got an unexpected response status of %q", resp.Status) + } + return nil +} + +func (d *Destination) Teardown(ctx context.Context) error { + if d.client != nil { + d.client.CloseIdleConnections() + } + return nil +} diff --git a/destination_test.go b/destination_test.go new file mode 100644 index 0000000..17d2147 --- /dev/null +++ b/destination_test.go @@ -0,0 +1,173 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "testing" + "time" + + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" +) + +var serverRunning bool + +func TestMain(m *testing.M) { + runServer() + os.Exit(m.Run()) +} + +func TestTeardown_NoOpen(t *testing.T) { + con := NewDestination() + err := con.Teardown(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } +} + +func TestDestination_Post(t *testing.T) { + is := is.New(t) + runServer() + url := "http://localhost:8081/resource" + ctx := context.Background() + dest := NewDestination() + err := dest.Configure(ctx, map[string]string{ + "url": url, + "method": "POST", + }) + is.NoErr(err) + err = dest.Open(ctx) + is.NoErr(err) + rec := sdk.Record{ + Payload: sdk.Change{ + After: sdk.RawData(`{"id": "2", "name": "Item 2"}`), + }, + } + _, err = dest.Write(ctx, []sdk.Record{rec}) + is.NoErr(err) + _, ok := resources["2"] + is.True(ok) + is.True(resources["2"].Name == "Item 2") +} + +func TestDestination_Delete(t *testing.T) { + is := is.New(t) + runServer() + url := "http://localhost:8081/resource/1" + ctx := context.Background() + dest := NewDestination() + err := dest.Configure(ctx, map[string]string{ + "url": url, + "method": "DELETE", + }) + is.NoErr(err) + err = dest.Open(ctx) + is.NoErr(err) + rec := sdk.Record{} + _, err = dest.Write(ctx, []sdk.Record{rec}) + is.NoErr(err) + _, ok := resources["1"] + // resource was deleted + is.True(!ok) +} + +// resource represents a dummy resource +type resource struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// init with one resource +var resources = map[string]resource{ + "1": {ID: "1", Name: "Item 1"}, +} + +func runServer() { + if serverRunning { + return + } + serverRunning = true + address := ":8081" + + http.HandleFunc("/resource", handleResource) + http.HandleFunc("/resource/", handleSingleResource) + + server := &http.Server{ + Addr: address, + ReadTimeout: 10 * time.Second, // Set your desired read timeout + WriteTimeout: 10 * time.Second, // Set your desired write timeout + } + + go func() { + err := server.ListenAndServe() + if err != nil { + fmt.Printf("Server error: %s\n", err) + } + }() +} + +// handleResource handles POST requests to create a new resource +func handleResource(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var newResource resource + err := json.NewDecoder(r.Body).Decode(&newResource) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + resources[newResource.ID] = newResource + w.WriteHeader(http.StatusCreated) +} + +// handleSingleResource handles DELETE, PATCH, and PUT requests for a single resource +func handleSingleResource(w http.ResponseWriter, r *http.Request) { + id := r.URL.Path[len("/resource/"):] + + switch r.Method { + case http.MethodDelete: + delete(resources, id) + w.WriteHeader(http.StatusNoContent) + case http.MethodPatch: + var updatedResource resource + err := json.NewDecoder(r.Body).Decode(&updatedResource) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + resources[id] = updatedResource + w.WriteHeader(http.StatusOK) + case http.MethodPut: + var newResource resource + err := json.NewDecoder(r.Body).Decode(&newResource) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + resources[id] = newResource + w.WriteHeader(http.StatusOK) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} diff --git a/go.mod b/go.mod index 09c8836..563c7cf 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,11 @@ module github.com/conduitio-labs/conduit-connector-http go 1.21 -toolchain go1.21.1 - require ( github.com/conduitio/conduit-connector-sdk v0.8.0 github.com/golangci/golangci-lint v1.55.2 + github.com/matryer/is v1.4.1 + golang.org/x/time v0.4.0 ) require ( @@ -117,7 +117,6 @@ require ( github.com/maratori/testableexamples v1.0.0 // indirect github.com/maratori/testpackage v1.1.1 // indirect github.com/matoous/godox v0.0.0-20230222163458-006bad1f9d26 // indirect - github.com/matryer/is v1.4.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -203,7 +202,6 @@ require ( golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.4.0 // indirect golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.56.3 // indirect diff --git a/paramgen_dest.go b/paramgen_dest.go new file mode 100644 index 0000000..e4153b1 --- /dev/null +++ b/paramgen_dest.go @@ -0,0 +1,41 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-connector-sdk/tree/main/cmd/paramgen + +package http + +import ( + sdk "github.com/conduitio/conduit-connector-sdk" +) + +func (DestinationConfig) Parameters() map[string]sdk.Parameter { + return map[string]sdk.Parameter{ + "headers": { + Default: "", + Description: "Http headers to use in the request, comma separated list of : separated pairs", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, + }, + "method": { + Default: "POST", + Description: "Http method to use in the request", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationInclusion{List: []string{"POST", "PUT", "DELETE", "PATCH"}}, + }, + }, + "params": { + Default: "", + Description: "parameters to use in the request, comma separated list of : separated pairs", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, + }, + "url": { + Default: "", + Description: "Http url to send requests to", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + } +} diff --git a/paramgen_src.go b/paramgen_src.go index 7796c56..17a30aa 100644 --- a/paramgen_src.go +++ b/paramgen_src.go @@ -1,5 +1,5 @@ // Code generated by paramgen. DO NOT EDIT. -// Source: github.com/conduitio/conduit-connector-sdk/cmd/paramgen +// Source: github.com/ConduitIO/conduit-connector-sdk/tree/main/cmd/paramgen package http @@ -9,9 +9,35 @@ import ( func (SourceConfig) Parameters() map[string]sdk.Parameter { return map[string]sdk.Parameter{ + "headers": { + Default: "", + Description: "Http headers to use in the request, comma separated list of : separated pairs", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, + }, + "method": { + Default: "GET", + Description: "Http method to use in the request", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationInclusion{List: []string{"GET", "HEAD", "OPTIONS"}}, + }, + }, + "params": { + Default: "", + Description: "parameters to use in the request, comma separated list of : separated pairs", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, + }, + "pollingPeriod": { + Default: "5m", + Description: "how often the connector will get data from the url", + Type: sdk.ParameterTypeDuration, + Validations: []sdk.Validation{}, + }, "url": { Default: "", - Description: "url for the http server", + Description: "Http url to send requests to", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{ sdk.ValidationRequired{}, diff --git a/source.go b/source.go index f32db18..aea30ee 100644 --- a/source.go +++ b/source.go @@ -19,105 +19,151 @@ package http import ( "context" "fmt" + "io" + "net/http" + "strings" + "time" sdk "github.com/conduitio/conduit-connector-sdk" + "golang.org/x/time/rate" ) type Source struct { sdk.UnimplementedSource - config SourceConfig - // TODO Add the client reference here so I could close it as part of the Teardown... + config SourceConfig + client *http.Client + limiter *rate.Limiter + header http.Header } type SourceConfig struct { - // url for the http server - URL string `json:"url" validate:"required"` + Config + // how often the connector will get data from the url + PollingPeriod time.Duration `json:"pollingPeriod" default:"5m"` + // Http method to use in the request + Method string `default:"GET" validate:"inclusion=GET|HEAD|OPTIONS"` } func NewSource() sdk.Source { - // Create Source and wrap it in the default middleware. return sdk.SourceWithMiddleware(&Source{}) } func (s *Source) Parameters() map[string]sdk.Parameter { - // Parameters is a map of named Parameters that describe how to configure - // the Source. Parameters can be generated from SourceConfig with paramgen. return s.config.Parameters() } func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { - // Configure is the first function to be called in a connector. It provides - // the connector with the configuration that can be validated and stored. - // In case the configuration is not valid it should return an error. - // Testing if your connector can reach the configured data source should be - // done in Open, not in Configure. - // The SDK will validate the configuration and populate default values - // before calling Configure. If you need to do more complex validations you - // can do them manually here. - sdk.Logger(ctx).Info().Msg("Configuring Source...") - err := sdk.Util.ParseConfig(cfg, &s.config) + + var config SourceConfig + err := sdk.Util.ParseConfig(cfg, &config) if err != nil { return fmt.Errorf("invalid config: %w", err) } + s.config.URL, err = s.config.addParamsToURL() + if err != nil { + return err + } + s.header, err = config.Config.getHeader() + if err != nil { + return fmt.Errorf("invalid header config: %w", err) + } + s.config = config + return nil } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { - // Open is called after Configure to signal the plugin it can prepare to - // start producing records. If needed, the plugin should open connections in - // this function. The position parameter will contain the position of the - // last record that was successfully processed, Source should therefore - // start producing records after this position. The context passed to Open - // will be cancelled once the plugin receives a stop signal from Conduit. + // create client + s.client = &http.Client{} - // TODO: Where to use the HTTP client, etc... - // connect to the server - // s.config.URL + // check connection + req, err := http.NewRequestWithContext(ctx, http.MethodHead, s.config.URL, nil) + if err != nil { + return fmt.Errorf("error creating HTTP request %q: %w", s.config.URL, err) + } + req.Header = s.header + resp, err := s.client.Do(req) + if err != nil { + return fmt.Errorf("error pinging URL %q: %w", s.config.URL, err) + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusUnauthorized { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + return fmt.Errorf("authorization failed, %s: %s", http.StatusText(http.StatusUnauthorized), string(body)) + } + s.limiter = rate.NewLimiter(rate.Every(s.config.PollingPeriod), 1) return nil } func (s *Source) Read(ctx context.Context) (sdk.Record, error) { - // Read returns a new Record and is supposed to block until there is either - // a new record or the context gets cancelled. It can also return the error - // ErrBackoffRetry to signal to the SDK it should call Read again with a - // backoff retry. - // If Read receives a cancelled context or the context is cancelled while - // Read is running it must stop retrieving new records from the source - // system and start returning records that have already been buffered. If - // there are no buffered records left Read must return the context error to - // signal a graceful stop. If Read returns ErrBackoffRetry while the context - // is cancelled it will also signal that there are no records left and Read - // won't be called again. - // After Read returns an error the function won't be called again (except if - // the error is ErrBackoffRetry, as mentioned above). - // Read can be called concurrently with Ack. - - // TODO: read data from the source based on the configuration and will return a sdk.Record - // TODO: Maybe process what's returned from my dummy server and return it (as a sdk.Record) - // elaborate a bit more existing response. - - // TODO: Use ErrBackoffRetry when there's nothing new to process. - return sdk.Record{}, nil + err := s.limiter.Wait(ctx) + if err != nil { + return sdk.Record{}, err + } + rec, err := s.getRecord(ctx) + if err != nil { + return sdk.Record{}, fmt.Errorf("error getting data: %w", err) + } + return rec, nil +} + +func (s *Source) getRecord(ctx context.Context) (sdk.Record, error) { + // create GET request + req, err := http.NewRequestWithContext(ctx, s.config.Method, s.config.URL, nil) + if err != nil { + return sdk.Record{}, fmt.Errorf("error creating HTTP request: %w", err) + } + req.Header = s.header + // get response + resp, err := s.client.Do(req) + if err != nil { + return sdk.Record{}, fmt.Errorf("error getting data from URL: %w", err) + } + defer resp.Body.Close() + // check response status + if resp.StatusCode != http.StatusOK { + return sdk.Record{}, fmt.Errorf("response status should be %v, got status=%v", http.StatusOK, resp.StatusCode) + } + // read body + body, err := io.ReadAll(resp.Body) + if err != nil { + return sdk.Record{}, fmt.Errorf("error reading body for response %v: %w", resp, err) + } + // add response header to metadata + meta := sdk.Metadata{} + for key, val := range resp.Header { + meta[key] = strings.Join(val, ",") + } + + // create record + now := time.Now().Unix() + rec := sdk.Record{ + Payload: sdk.Change{ + Before: nil, + After: sdk.RawData(body), + }, + Metadata: meta, + Operation: sdk.OperationCreate, + Position: sdk.Position(fmt.Sprintf("unix-%v", now)), + Key: sdk.RawData(fmt.Sprintf("%v", now)), + } + return rec, nil } func (s *Source) Ack(ctx context.Context, position sdk.Position) error { - // Ack signals to the implementation that the record with the supplied - // position was successfully processed. This method might be called after - // the context of Read is already cancelled, since there might be - // outstanding acks that need to be delivered. When Teardown is called it is - // guaranteed there won't be any more calls to Ack. - // Ack can be called concurrently with Read. + sdk.Logger(ctx).Debug().Str("position", string(position)).Msg("got ack") return nil } -func (s *Source) Teardown(ctx context.Context) error { - // Teardown signals to the plugin that there will be no more calls to any - // other function. After Teardown returns, the plugin should be ready for a - // graceful shutdown. - - // TODO: The opposite to Open (close client, etc...) +func (s *Source) Teardown(context.Context) error { + if s.client != nil { + s.client.CloseIdleConnections() + } return nil } diff --git a/source_test.go b/source_test.go index 7ac2d71..6353dd0 100644 --- a/source_test.go +++ b/source_test.go @@ -16,7 +16,13 @@ package http import ( "context" + "fmt" + "net/http" "testing" + "time" + + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" ) func TestTeardownSource_NoOpen(t *testing.T) { @@ -26,3 +32,115 @@ func TestTeardownSource_NoOpen(t *testing.T) { t.Errorf("expected no error, got %v", err) } } + +func TestSource_Get(t *testing.T) { + is := is.New(t) + ctx := context.Background() + src := NewSource() + _, err := createServer() + is.NoErr(err) + err = src.Configure(ctx, map[string]string{ + "url": "http://localhost:8082/resource/resource1", + "method": "GET", + }) + is.NoErr(err) + err = src.Open(ctx, sdk.Position{}) + is.NoErr(err) + rec, err := src.Read(ctx) + is.NoErr(err) + is.True(string(rec.Payload.After.Bytes()) == "This is resource 1") +} + +func TestSource_Options(t *testing.T) { + is := is.New(t) + ctx := context.Background() + src := NewSource() + _, err := createServer() + is.NoErr(err) + err = src.Configure(ctx, map[string]string{ + "url": "http://localhost:8082/resource/resource1", + "method": "OPTIONS", + }) + is.NoErr(err) + err = src.Open(ctx, sdk.Position{}) + is.NoErr(err) + rec, err := src.Read(ctx) + is.NoErr(err) + meta, ok := rec.Metadata["Allow"] + is.True(ok) + is.Equal(meta, "GET, HEAD, OPTIONS") +} + +func TestSource_Head(t *testing.T) { + is := is.New(t) + ctx := context.Background() + src := NewSource() + _, err := createServer() + is.NoErr(err) + err = src.Configure(ctx, map[string]string{ + "url": "http://localhost:8082/resource/", + "method": "HEAD", + }) + is.NoErr(err) + err = src.Open(ctx, sdk.Position{}) + is.NoErr(err) + _, err = src.Read(ctx) + is.NoErr(err) +} + +// ResourceMap stores resources +var ResourceMap = map[string]string{ + "resource1": "This is resource 1", + "resource2": "This is resource 2", +} + +func createServer() (*http.ServeMux, error) { + // Define the server address + address := ":8082" + + // Create a new HTTP server + server := http.NewServeMux() + + // Handler for GET requests + server.HandleFunc("/resource/", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + // Extract resource name from URL + resourceName := r.URL.Path[len("/resource/"):] + resource, found := ResourceMap[resourceName] + if !found { + w.WriteHeader(http.StatusNotFound) + return + } + + // Return the resource + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "%s", resource) + case http.MethodHead: + // Respond with headers only + w.WriteHeader(http.StatusOK) + case http.MethodOptions: + // Respond with allowed methods + w.Header().Set("Allow", "GET, HEAD, OPTIONS") + w.WriteHeader(http.StatusOK) + default: + // Method not allowed + w.WriteHeader(http.StatusMethodNotAllowed) + } + }) + + serverInstance := &http.Server{ + Addr: address, + Handler: server, + ReadTimeout: 10 * time.Second, // Set your desired read timeout + WriteTimeout: 10 * time.Second, // Set your desired write timeout + } + // Start the HTTP server + go func() { + err := serverInstance.ListenAndServe() + if err != nil { + fmt.Printf("Server error: %s\n", err) + } + }() + return server, nil +} diff --git a/spec.go b/spec.go index c667f42..176084c 100644 --- a/spec.go +++ b/spec.go @@ -26,8 +26,8 @@ var version = "(devel)" func Specification() sdk.Specification { return sdk.Specification{ Name: "http", - Summary: "HTTP source connector", - Description: "a conduit HTTP source connector that reads from an HTTP server", + Summary: "HTTP source and destination connectors for Conduit.", + Description: "Conduit HTTP source and destination connectors, they connect to an HTTP URL and send HTTP requests.", Version: version, Author: "Meroxa, Inc.", } diff --git a/tools.go b/tools.go index f6e5173..2f445ee 100644 --- a/tools.go +++ b/tools.go @@ -17,5 +17,6 @@ package main import ( + _ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" )