Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

Commit

Permalink
Reorganize auto-discovery code, add error checking, comments, tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
lesovsky committed May 18, 2021
1 parent 625ccfa commit 514381a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 93 deletions.
160 changes: 90 additions & 70 deletions internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,55 +300,40 @@ func (repo *Repository) lookupServices(config Config) error {

// walk through the pid list and looking for the processes with appropriate names
for _, pid := range pids {
proc, err := process.NewProcess(pid)
if err != nil {
log.Debugf("auto-discovery: failed to create process struct for pid %d: %s; skip", pid, err)

// Check process, and get its properties.
name, cwd, cmdline, skip := checkProcessProperties(pid)
if skip {
continue
}

name, err := proc.Name()
if err != nil {
log.Debugf("auto-discovery: no process name for pid %d: %s; skip", pid, err)
continue // skip processes with no names
}
var service Service
var err error

switch name {
case "postgres":
ppid, _ := proc.Ppid() // error doesn't matter here, even if ppid will be 0 - we're interested in ppid == 1
if ppid == 1 {
postgres, err := discoverPostgres(proc, config)
if err != nil {
log.Warnf("auto-discovery [postgres]: discovery failed: %s; skip", err)
break
}

// check service in the repo
if s := repo.getService(postgres.ServiceID); s.ServiceID == postgres.ServiceID {
log.Debugf("auto-discovery [postgres]: service [%s] already in the repository, skip", s.ServiceID)
break
}

repo.addService(postgres) // add postgresql service to the repo
log.Infof("auto-discovery [postgres]: service added [%s]", postgres.ServiceID)
}
service, err = discoverPostgres(pid, cwd, config)
case "pgbouncer":
pgbouncer, err := discoverPgbouncer(proc, config)
if err != nil {
log.Warnf("auto-discovery [pgbouncer]: discovery failed: %s; skip", err)
break
}
service, err = discoverPgbouncer(pid, cmdline, config)
default:
continue
}

// check service in the repo
if s := repo.getService(pgbouncer.ServiceID); s.ServiceID == pgbouncer.ServiceID {
log.Debugf("auto-discovery [pgbouncer]: service [%s] already in the repository, skip", s.ServiceID)
break
}
if err != nil {
log.Warnf("auto-discovery [%s]: discovery failed: %s; skip", name, err)
continue
}

repo.addService(pgbouncer) // add pgbouncer service to the repo
log.Infof("auto-discovery [pgbouncer]: service added [%s]", pgbouncer.ServiceID)
default:
continue // others are not interesting
// Check service is not present in the repo.
if s := repo.getService(service.ServiceID); s.ServiceID == service.ServiceID {
log.Debugf("auto-discovery [%s]: service [%s] already in the repository, skip", name, s.ServiceID)
continue
}

// Add postgresql service to the repo.
repo.addService(service)

log.Infof("auto-discovery [%s]: service added [%s]", name, service.ServiceID)
}
return nil
}
Expand Down Expand Up @@ -444,17 +429,14 @@ func (repo *Repository) healthcheckServices() {

// discoverPostgres reads postmaster.pid stored in data directory.
// Using postmaster.pid data construct "conninfo" string and test it through making a connection.
func discoverPostgres(proc *process.Process, config Config) (Service, error) {
log.Debugf("auto-discovery [postgres]: analyzing process with pid %d", proc.Pid)
func discoverPostgres(pid int32, cwd string, config Config) (Service, error) {
log.Debugf("auto-discovery [postgres]: analyzing process with pid %d", pid)

// Postgres at startup change current working directory to the data directory.
// Use it for find postmaster.pid.
datadirCmdPath, err := proc.Cwd()
if err != nil {
return Service{}, err
}
var err error

connParams, err := newPostgresConnectionParams(datadirCmdPath + "/postmaster.pid")
// Postgres always use data directory as current working directory.
// Use it for find postmaster.pid.
connParams, err := newPostgresConnectionParams(cwd + "/postmaster.pid")
if err != nil {
return Service{}, err
}
Expand All @@ -464,10 +446,18 @@ func discoverPostgres(proc *process.Process, config Config) (Service, error) {
var connString string
for _, v := range []bool{true, false} {
connString = newPostgresConnectionString(connParams, config.ConnDefaults, v)
if err := attemptConnect(connString); err == nil {
// no need to continue because connection with created connString was successful
break
err = attemptConnect(connString)
if err != nil {
connString = ""
continue
}

// no need to continue because connection with created connString was successful
break
}

if connString == "" || err != nil {
return Service{}, err
}

s := Service{
Expand All @@ -476,20 +466,10 @@ func discoverPostgres(proc *process.Process, config Config) (Service, error) {
Collector: nil,
}

log.Debugf("auto-discovery [postgres]: service has been found, pid %d, available through %s", proc.Pid, connString)
log.Debugf("auto-discovery [postgres]: service has been found, pid %d, available through %s", pid, connString)
return s, nil
}

// parsePostgresProcessCmdline parses postgres process cmdline for data directory argument
func parsePostgresProcessCmdline(cmdline []string) (string, error) {
for i, arg := range cmdline {
if arg == "-D" && len(cmdline) > i+1 {
return cmdline[i+1], nil
}
}
return "", fmt.Errorf("data directory argument not found")
}

// newPostgresConnectionParams reads connection parameters from postmaster.pid
func newPostgresConnectionParams(pidFilePath string) (connectionParams, error) {
p := connectionParams{}
Expand Down Expand Up @@ -577,13 +557,8 @@ func newPostgresConnectionString(connParams connectionParams, defaults map[strin
}

// discoverPgbouncer check passed process is it a Pgbouncer process or not.
func discoverPgbouncer(proc *process.Process, config Config) (Service, error) {
log.Debugf("auto-discovery [pgbouncer]: analyzing process with pid %d", proc.Pid)

cmdline, err := proc.Cmdline()
if err != nil {
return Service{}, err
}
func discoverPgbouncer(pid int32, cmdline string, config Config) (Service, error) {
log.Debugf("auto-discovery [pgbouncer]: analyzing process with pid %d", pid)

if len(cmdline) == 0 {
return Service{}, fmt.Errorf("empty cmdline")
Expand All @@ -610,7 +585,7 @@ func discoverPgbouncer(proc *process.Process, config Config) (Service, error) {
Collector: nil,
}

log.Debugf("auto-discovery: pgbouncer service has been found, pid %d, available through %s:%d", proc.Pid, connParams.listenAddr, connParams.listenPort)
log.Debugf("auto-discovery: pgbouncer service has been found, pid %d, available through %s:%d", pid, connParams.listenAddr, connParams.listenPort)
return s, nil
}

Expand Down Expand Up @@ -738,6 +713,51 @@ func parsePgbouncerCmdline(cmdline string) string {
return ""
}

// checkProcessProperties check process properties and returns necessary properties if process valid.
func checkProcessProperties(pid int32) (string, string, string, bool) {
proc, err := process.NewProcess(pid)
if err != nil {
log.Debugf("auto-discovery: create process object for pid %d failed: %s; skip", pid, err)
return "", "", "", true
}

ppid, err := proc.Ppid()
if err != nil {
log.Debugf("auto-discovery: get parent pid for pid %d failed: %s; skip", pid, err)
return "", "", "", true
}

// Skip processes which are not children of init.
if ppid != 1 {
return "", "", "", true
}

name, err := proc.Name()
if err != nil {
log.Debugf("auto-discovery: read name for pid %d failed: %s; skip", pid, err)
return "", "", "", true
}

// Skip processes which are not Postgres or Pgbouncer.
if name != "postgres" && name != "pgbouncer" {
return "", "", "", true
}

cwd, err := proc.Cwd()
if err != nil {
log.Infof("auto-discovery: read cwd for pid %d failed: %s; skip", pid, err)
return "", "", "", true
}

cmdline, err := proc.Cmdline()
if err != nil {
log.Infof("auto-discovery: read cmdline for pid %d failed: %s; skip", pid, err)
return "", "", "", true
}

return name, cwd, cmdline, false
}

// stringsContains returns true if array of strings contains specific string
func stringsContains(ss []string, s string) bool {
for _, val := range ss {
Expand Down
47 changes: 24 additions & 23 deletions internal/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/shirou/gopsutil/process"
"github.com/stretchr/testify/assert"
"github.com/weaponry/pgscv/internal/log"
"github.com/weaponry/pgscv/internal/model"
Expand Down Expand Up @@ -188,29 +189,6 @@ func Test_healthcheckServices(t *testing.T) {
assert.Equal(t, 1, r.totalServices())
}

func Test_parsePostgresProcessCmdline(t *testing.T) {
testCases := []struct {
valid bool
payload []string
expected string
}{
{valid: true, payload: []string{"/bin/postgres", "-D", "/data", "-c", "config_file=/postgresql.conf"}, expected: "/data"},
{valid: false, payload: []string{"/bin/true", "-f", "config_file=/postgresql.conf"}},
{valid: false, payload: []string{"/bin/true", "-D"}},
}

for _, tc := range testCases {
s, err := parsePostgresProcessCmdline(tc.payload)
if tc.valid {
assert.NoError(t, err)
assert.Equal(t, tc.expected, s)
} else {
assert.Error(t, err)
assert.Empty(t, s)
}
}
}

func Test_newPostgresConnectionParams(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -505,6 +483,29 @@ func Test_parsePgbouncerCmdline(t *testing.T) {
}
}

func Test_checkProcessProperties(t *testing.T) {
pids, err := process.Pids()
assert.NoError(t, err)

// add invalid pid
pids = append(pids, 0)

for _, pid := range pids {
name, cwd, cmdline, skip := checkProcessProperties(pid)

switch name {
case "postgres":
assert.NotEqual(t, "", cwd)
assert.False(t, skip)
case "pgbouncer":
assert.NotEqual(t, "", cmdline)
assert.False(t, skip)
default:
assert.True(t, skip)
}
}
}

func Test_stringsContains(t *testing.T) {
ss := []string{"first_example_string", "second_example_string", "third_example_string"}

Expand Down

0 comments on commit 514381a

Please sign in to comment.