Skip to content

Commit

Permalink
Merge pull request #242 from bonitoo-io/doc/example_cont
Browse files Browse the repository at this point in the history
docs: example and API docs improvements
  • Loading branch information
vlastahajek authored Apr 15, 2021
2 parents f2d4c41 + 035fe56 commit 39e956c
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 30 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
- Setup (onboarding) now sends correctly retentionDuration if specified
- `RetentionRule` used in `Bucket` now contains `ShardGroupDurationSeconds` to specify the shard group duration.

### Documentation
[#242](https://github.com/influxdata/influxdb-client-go/pull/242) Documentation improvements:
- [Custom server API example](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#example-Client-CustomServerAPICall) now shows how to create DBRP mapping
- Improved documentation about concurrency

## 2.2.3 [2021-04-01]
### Bug fixes
1. [#236](https://github.com/influxdata/influxdb-client-go/pull/236) Setting MaxRetries to zero value disables retry strategy.
Expand Down
86 changes: 86 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This repository contains the reference Go client for InfluxDB 2.
- [Basic Example](#basic-example)
- [Writes in Detail](#writes)
- [Queries in Detail](#queries)
- [Concurrency](#concurrency)
- [InfluxDB 1.8 API compatibility](#influxdb-18-api-compatibility)
- [Contributing](#contributing)
- [License](#license)
Expand Down Expand Up @@ -377,7 +378,92 @@ func main() {
client.Close()
}
```
### Concurrency
InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.
The best practise is to use a single `Client` instance per server URL. This ensures optimized resources usage,
most importantly reusing HTTP connections.
For efficient reuse of HTTP resources among multiple clients, create an HTTP client and use `Options.SetHTTPClient()` for setting it to all clients:
```go
// Create HTTP client
httpClient := &http.Client{
Timeout: time.Second * time.Duration(60),
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
// Client for server 1
client1 := influxdb2.NewClientWithOptions("https://server:8086", "my-token", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
// Client for server 2
client2 := influxdb2.NewClientWithOptions("https://server:9999", "my-token2", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
```
Client ensures that there is a single instance of each server API sub-client for the specific area. E.g. a single `WriteAPI` instance for each org/bucket pair,
a single `QueryAPI` for each org.
Such a single API sub-client instance can be used concurrently:
```go
package main
import (
"math/rand"
"sync"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
func main() {
// Create client
client := influxdb2.NewClient("http://localhost:8086", "my-token")
// Ensure closing the client
defer client.Close()
// Get write client
writeApi := client.WriteAPI("my-org", "my-bucket")
// Create channel for points feeding
pointsCh := make(chan *write.Point, 200)
threads := 5
var wg sync.WaitGroup
go func(points int) {
for i := 0; i < points; i++ {
p := influxdb2.NewPoint("meas",
map[string]string{"tag": "tagvalue"},
map[string]interface{}{"val1": rand.Int63n(1000), "val2": rand.Float64()*100.0 - 50.0},
time.Now())
pointsCh <- p
}
close(pointsCh)
}(1000000)
// Launch write routines
for t := 0; t < threads; t++ {
wg.Add(1)
go func() {
for p := range pointsCh {
writeApi.WritePoint(p)
}
wg.Done()
}()
}
// Wait for writes complete
wg.Wait()
}
## InfluxDB 1.8 API compatibility
Expand Down
2 changes: 2 additions & 0 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
)

// WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server.
// WriteAPI can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.
type WriteAPI interface {
// WriteRecord writes asynchronously line protocol record into bucket.
// WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
Expand Down
4 changes: 4 additions & 0 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (

// WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.
// It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
//
// WriteAPIBlocking can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
//
// To add implicit batching, use a wrapper, such as:
// type writer struct {
// batch []*write.Point
Expand Down
9 changes: 6 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ type Client interface {
ServerURL() string
// HTTPService returns underlying HTTP service object used by client
HTTPService() http.Service
// WriteAPI returns the asynchronous, non-blocking, Write client
// WriteAPI returns the asynchronous, non-blocking, Write client.
// Ensures using a single WriteAPI instance for each org/bucket pair.
WriteAPI(org, bucket string) api.WriteAPI
// WriteAPIBlocking returns the synchronous, blocking, Write client
// WriteAPIBlocking returns the synchronous, blocking, Write client.
// Ensures using a single WriteAPIBlocking instance for each org/bucket pair.
WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking
// QueryAPI returns Query client
// QueryAPI returns Query client.
// Ensures using a single QueryAPI instance each org.
QueryAPI(org string) api.QueryAPI
// AuthorizationsAPI returns Authorizations API client.
AuthorizationsAPI() api.AuthorizationsAPI
Expand Down
62 changes: 35 additions & 27 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,50 +31,58 @@ func ExampleClient_newClientWithOptions() {
}

func ExampleClient_customServerAPICall() {
// Create a new client using an InfluxDB server base URL and empty token
// This example shows how to perform custom server API invocation for any endpoint
// Here we will create a DBRP mapping which allows using buckets in legacy write and query (InfluxQL) endpoints

// Create client. You need an admin token for creating DBRP mapping
client := influxdb2.NewClient("http://localhost:8086", "my-token")

// Always close client at the end
defer client.Close()

// Get generated client for server API calls
apiClient := domain.NewClientWithResponses(client.HTTPService())
// Get an organization that will own task
org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
ctx := context.Background()

// Get a bucket we would like to query using InfluxQL
b, err := client.BucketsAPI().FindBucketByName(ctx, "my-bucket")
if err != nil {
panic(err)
}
// Get an organization that will own the mapping
o, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org")
if err != nil {
//return err
panic(err)
}

// Basic task properties
taskDescription := "Example task"
taskFlux := `option task = {
name: "My task",
every: 1h
}
from(bucket:"my-bucket") |> range(start: -1m) |> last()`
taskStatus := domain.TaskStatusTypeActive

// Create TaskCreateRequest object
taskRequest := domain.TaskCreateRequest{
Org: &org.Name,
OrgID: org.Id,
Description: &taskDescription,
Flux: taskFlux,
Status: &taskStatus,
yes := true
db := "my-bucket"
rp := "autogen"
// Fill required fields of the DBRP struct
dbrp := domain.DBRP{
BucketID: b.Id,
Database: &db,
Default: &yes,
OrgID: o.Id,
RetentionPolicy: &rp,
}

// Issue an API call
resp, err := apiClient.PostTasksWithResponse(context.Background(), &domain.PostTasksParams{}, domain.PostTasksJSONRequestBody(taskRequest))
params := &domain.PostDBRPParams{}
// Call server API
resp, err := apiClient.PostDBRPWithResponse(ctx, params, domain.PostDBRPJSONRequestBody(dbrp))
if err != nil {
panic(err)
}

// Always check generated response errors
// Check generated response errors
if resp.JSONDefault != nil {
panic(resp.JSONDefault.Message)
}
// Check generated response errors
if resp.JSON400 != nil {
panic(resp.JSON400.Message)
}

// Use API call result
task := resp.JSON201
fmt.Println("Created task: ", task.Name)
newDbrp := resp.JSON201
fmt.Printf("Created DBRP: %#v\n", newDbrp)
}

0 comments on commit 39e956c

Please sign in to comment.