Skip to content

Commit

Permalink
Add Solr 3 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
mkboudreau committed Jan 17, 2018
1 parent ad921a3 commit 45acf5f
Show file tree
Hide file tree
Showing 4 changed files with 930 additions and 49 deletions.
2 changes: 2 additions & 0 deletions plugins/inputs/solr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The [solr](http://lucene.apache.org/solr/) plugin collects stats via the

More about [performance statistics](https://cwiki.apache.org/confluence/display/solr/Performance+Statistics+Reference)

Tested from 3.5 to 6.*

### Configuration:

```
Expand Down
105 changes: 73 additions & 32 deletions plugins/inputs/solr/solr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package solr
import (
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -78,22 +79,7 @@ type Core struct {
// QueryHandler is an exported type that
// contains query handler metrics
type QueryHandler struct {
Stats struct {
One5minRateReqsPerSecond float64 `json:"15minRateReqsPerSecond"`
FiveMinRateReqsPerSecond float64 `json:"5minRateReqsPerSecond"`
Seven5thPcRequestTime float64 `json:"75thPcRequestTime"`
Nine5thPcRequestTime float64 `json:"95thPcRequestTime"`
Nine99thPcRequestTime float64 `json:"999thPcRequestTime"`
Nine9thPcRequestTime float64 `json:"99thPcRequestTime"`
AvgRequestsPerSecond float64 `json:"avgRequestsPerSecond"`
AvgTimePerRequest float64 `json:"avgTimePerRequest"`
Errors int64 `json:"errors"`
HandlerStart int64 `json:"handlerStart"`
MedianRequestTime float64 `json:"medianRequestTime"`
Requests int64 `json:"requests"`
Timeouts int64 `json:"timeouts"`
TotalTime float64 `json:"totalTime"`
} `json:"stats"`
Stats interface{} `json:"stats"`
}

// UpdateHandler is an exported type that
Expand Down Expand Up @@ -286,22 +272,22 @@ func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansDa
}

for name, metrics := range queryMetrics {
coreFields := map[string]interface{}{
"15min_rate_reqs_per_second": metrics.Stats.One5minRateReqsPerSecond,
"5min_rate_reqs_per_second": metrics.Stats.FiveMinRateReqsPerSecond,
"75th_pc_request_time": metrics.Stats.Seven5thPcRequestTime,
"95th_pc_request_time": metrics.Stats.Nine5thPcRequestTime,
"999th_pc_request_time": metrics.Stats.Nine99thPcRequestTime,
"99th_pc_request_time": metrics.Stats.Nine9thPcRequestTime,
"avg_requests_per_second": metrics.Stats.AvgRequestsPerSecond,
"avg_time_per_request": metrics.Stats.AvgTimePerRequest,
"errors": metrics.Stats.Errors,
"handler_start": metrics.Stats.HandlerStart,
"median_request_time": metrics.Stats.MedianRequestTime,
"requests": metrics.Stats.Requests,
"timeouts": metrics.Stats.Timeouts,
"total_time": metrics.Stats.TotalTime,
var coreFields map[string]interface{}

if metrics.Stats == nil {
continue
}

switch v := metrics.Stats.(type) {
case []interface{}:
m := convertArrayToMap(v)
coreFields = convertQueryHandlerMap(m)
case map[string]interface{}:
coreFields = convertQueryHandlerMap(v)
default:
continue
}

acc.AddFields(
"solr_queryhandler",
coreFields,
Expand All @@ -310,10 +296,44 @@ func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansDa
"handler": name},
time,
)

}
return nil
}

func convertArrayToMap(values []interface{}) map[string]interface{} {
var key string
result := make(map[string]interface{})
for i, item := range values {
if i%2 == 0 {
key = fmt.Sprintf("%v", item)
} else {
result[key] = item
}
}

return result
}

func convertQueryHandlerMap(value map[string]interface{}) map[string]interface{} {
return map[string]interface{}{
"15min_rate_reqs_per_second": getFloat(value["15minRateReqsPerSecond"]),
"5min_rate_reqs_per_second": getFloat(value["5minRateReqsPerSecond"]),
"75th_pc_request_time": getFloat(value["75thPcRequestTime"]),
"95th_pc_request_time": getFloat(value["95thPcRequestTime"]),
"99th_pc_request_time": getFloat(value["99thPcRequestTime"]),
"999th_pc_request_time": getFloat(value["999thPcRequestTime"]),
"avg_requests_per_second": getFloat(value["avgRequestsPerSecond"]),
"avg_time_per_request": getFloat(value["avgTimePerRequest"]),
"errors": getInt(value["errors"]),
"handler_start": getInt(value["handlerStart"]),
"median_request_time": getFloat(value["medianRequestTime"]),
"requests": getInt(value["requests"]),
"timeouts": getInt(value["timeouts"]),
"total_time": getFloat(value["totalTime"]),
}
}

// Add update metrics section to accumulator
func addUpdateHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var updateMetrics map[string]UpdateHandler
Expand Down Expand Up @@ -366,13 +386,34 @@ func getFloat(unk interface{}) float64 {
case float64:
return i
case string:
f, _ := strconv.ParseFloat(i, 64)
f, err := strconv.ParseFloat(i, 64)
if err != nil || math.IsNaN(f) {
return float64(0)
}
return f
default:
return float64(0)
}
}

// Get int64 from interface
func getInt(unk interface{}) int64 {
switch i := unk.(type) {
case int64:
return i
case float64:
return int64(i)
case string:
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
return int64(0)
}
return v
default:
return int64(0)
}
}

// Add cache metrics section to accumulator
func addCacheMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
if len(mBeansData.SolrMbeans) < 8 {
Expand Down
83 changes: 66 additions & 17 deletions plugins/inputs/solr/solr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,37 @@ func TestGatherStats(t *testing.T) {
map[string]string{"core": "main", "handler": "filterCache"})
}

func createMockServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, statusResponse)
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansMainResponse)
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansCore1Response)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintln(w, "nope")
}
}))
}
func TestSolr3GatherStats(t *testing.T) {
ts := createMockSolr3Server()
solr := NewSolr()
solr.Servers = []string{ts.URL}
var acc testutil.Accumulator
require.NoError(t, solr.Gather(&acc))

acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminMainCoreStatusExpected,
map[string]string{"core": "main"})

acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminCore1StatusExpected,
map[string]string{"core": "core1"})

acc.AssertContainsTaggedFields(t, "solr_core",
solr3CoreExpected,
map[string]string{"core": "main", "handler": "searcher"})

acc.AssertContainsTaggedFields(t, "solr_queryhandler",
solr3QueryHandlerExpected,
map[string]string{"core": "main", "handler": "org.apache.solr.handler.component.SearchHandler"})

acc.AssertContainsTaggedFields(t, "solr_updatehandler",
solr3UpdateHandlerExpected,
map[string]string{"core": "main", "handler": "updateHandler"})

acc.AssertContainsTaggedFields(t, "solr_cache",
solr3CacheExpected,
map[string]string{"core": "main", "handler": "filterCache"})
}
func TestNoCoreDataHandling(t *testing.T) {
ts := createMockNoCoreDataServer()
solr := NewSolr()
Expand All @@ -83,6 +96,24 @@ func TestNoCoreDataHandling(t *testing.T) {

}

func createMockServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, statusResponse)
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansMainResponse)
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansCore1Response)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintln(w, "nope")
}
}))
}

func createMockNoCoreDataServer() *httptest.Server {
var nodata string
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -101,3 +132,21 @@ func createMockNoCoreDataServer() *httptest.Server {
}
}))
}

func createMockSolr3Server() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, statusResponse)
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansSolr3MainResponse)
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, mBeansSolr3MainResponse)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintln(w, "nope")
}
}))
}
Loading

0 comments on commit 45acf5f

Please sign in to comment.