Skip to content

Commit

Permalink
Implement sgdops upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Saratura committed Jan 15, 2025
1 parent 5f38133 commit dfdf9d7
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 132 deletions.
22 changes: 18 additions & 4 deletions cmd/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package cmd
import (
"context"
"fmt"
"github.com/vshn/appcat/v4/pkg/auth/stackgres"
"net/http"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/thediveo/enumflag/v2"
"github.com/vshn/appcat/v4/pkg"
"github.com/vshn/appcat/v4/pkg/auth"
"github.com/vshn/appcat/v4/pkg/maintenance"
"github.com/vshn/appcat/v4/pkg/maintenance/auth"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -86,11 +87,24 @@ func (c *controller) runMaintenance(cmd *cobra.Command, _ []string) error {
if sgNamespace == "" {
return fmt.Errorf("missing environment variable: %s", "SG_NAMESPACE")
}
apiPassword := viper.GetString("API_PASSWORD")
if apiPassword == "" {
return fmt.Errorf("missing environment variable: %s", "API_PASSWORD")
}

apiUserName := viper.GetString("API_USERNAME")
if apiUserName == "" {
return fmt.Errorf("missing environment variable: %s", "API_USERNAME")
}

sClient, err := stackgres.New(apiUserName, apiPassword, sgNamespace)
if err != nil {
return err
}
m = &maintenance.PostgreSQL{
Client: kubeClient,
SgURL: "https://stackgres-restapi." + sgNamespace + ".svc",
MaintTimeout: time.Hour,
Client: kubeClient,
StackgresClient: sClient,
MaintTimeout: time.Hour,
}
case redis:
m = maintenance.NewRedis(kubeClient, getHTTPClient())
Expand Down
3 changes: 1 addition & 2 deletions pkg/maintenance/auth/http.go → pkg/auth/http.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package auth

import (
"k8s.io/client-go/transport"
"net/http"
"time"

"k8s.io/client-go/transport"
)

// GetAuthHTTPClient returns a HTTP client which is authenticated. It can be used to query private images.
Expand Down
114 changes: 114 additions & 0 deletions pkg/auth/stackgres/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package stackgres

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/hashicorp/go-version"
"net/http"
"sort"
)

type loginRequest struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}

type authToken struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
}
type PgVersions struct {
Postgresql []string `json:"postgresql"`
}

type StackgresClient struct {
username, password, sgNamespace, prefixUrl string
httpClient *http.Client
token authToken
}

func New(username, password, sgNamespace string) (*StackgresClient, error) {
t := http.DefaultTransport.(*http.Transport).Clone()
t.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpClient := &http.Client{Transport: t}

auth := loginRequest{Username: username, Password: password}

byteAuth, err := json.Marshal(auth)
if err != nil {
return nil, fmt.Errorf("cannot marshal login json: %w", err)
}
prefixUrl := "https://stackgres-restapi." + sgNamespace + ".svc"
resp, err := httpClient.Post(prefixUrl+"/stackgres/auth/login", "application/json", bytes.NewBuffer(byteAuth))
if err != nil {
return nil, fmt.Errorf("cannot login: %w", err)
}

token := &authToken{}
err = json.NewDecoder(resp.Body).Decode(token)
if err != nil {
return nil, fmt.Errorf("cannot decode login token: %w", err)
}

return &StackgresClient{
sgNamespace: sgNamespace,
username: username,
password: password,
}, nil
}

// GetAvailableVersions fetches all available versions
func (c StackgresClient) GetAvailableVersions() (*PgVersions, error) {
req, err := http.NewRequest("GET", c.prefixUrl+"/stackgres/version/postgresql", nil)
if err != nil {
return nil, fmt.Errorf("cannot get list of versions: %w", err)
}
req.Header.Add("Authorization", "Bearer "+c.token.AccessToken)

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error during http request: %w", err)
}
versionList := &PgVersions{}

err = json.NewDecoder(resp.Body).Decode(versionList)
if err != nil {
return nil, fmt.Errorf("error during json decoding: %w", err)
}
return versionList, nil
}

func GetLatestMinorVersion(vers string, versionList *PgVersions) (string, error) {

if versionList == nil {
return vers, nil
}

//p.log.Info("Searching most current minor version")
current, err := version.NewVersion(vers)
if err != nil {
return "", err
}

validVersions := make([]*version.Version, 0)
for _, newVersion := range versionList.Postgresql {
tmpVersion, err := version.NewVersion(newVersion)
if err != nil {
return "", err
}
if tmpVersion.Segments()[0] == current.Segments()[0] {
validVersions = append(validVersions, tmpVersion)
}
}

sort.Sort(sort.Reverse(version.Collection(validVersions)))

if len(validVersions) != 0 && current.LessThan(validVersions[0]) {
return validVersions[0].Original(), nil
}

return current.Original(), nil
}
75 changes: 70 additions & 5 deletions pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ package vshnpostgres

import (
"context"
"encoding/base64"
"errors"
"fmt"
xfnproto "github.com/crossplane/function-sdk-go/proto/v1beta1"
stackgresv1 "github.com/vshn/appcat/v4/apis/stackgres/v1"
vshnv1 "github.com/vshn/appcat/v4/apis/vshn/v1"
"github.com/vshn/appcat/v4/pkg"
"github.com/vshn/appcat/v4/pkg/auth/stackgres"
"github.com/vshn/appcat/v4/pkg/comp-functions/runtime"
v2 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pointer "k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
majorUpgradeSuffix = "-major-upgrade-dbops"
)

type ExtensionResponse struct {
Extensions []Extension `json:"extensions"`
}

type Extension struct {
Name string `json:"name"`
Publisher string `json:"publisher"`
Repository string `json:"repository"`
Versions []string `json:"versions"`
}

func MajorVersionUpgrade(ctx context.Context, comp *vshnv1.VSHNPostgreSQL, svc *runtime.ServiceRuntime) *xfnproto.Result {
comp, err := getVSHNPostgreSQL(ctx, svc)

Expand All @@ -36,7 +54,7 @@ func MajorVersionUpgrade(ctx context.Context, comp *vshnv1.VSHNPostgreSQL, svc *
if currentV != "" && currentV != expectedV {
// If SGDBOps resource does not exist create it otherwise cleanup if successful or keep the resource on fail
if errors.Is(err, runtime.ErrNotFound) {
return createMajorUpgradeSgDbOps(svc, comp, expectedV)
return createMajorUpgradeSgDbOps(ctx, svc, comp, expectedV)
} else if isSuccessful(majorUpgradeDbOps.Status.Conditions) {
return cleanUp(svc, comp, expectedV)
} else {
Expand Down Expand Up @@ -79,7 +97,7 @@ func isSuccessful(conditions *[]stackgresv1.SGDbOpsStatusConditionsItem) bool {
return successful && completed
}

func createMajorUpgradeSgDbOps(svc *runtime.ServiceRuntime, comp *vshnv1.VSHNPostgreSQL, expectedV string) *xfnproto.Result {
func createMajorUpgradeSgDbOps(ctx context.Context, svc *runtime.ServiceRuntime, comp *vshnv1.VSHNPostgreSQL, expectedV string) *xfnproto.Result {
cluster := &stackgresv1.SGCluster{}
err := svc.GetObservedKubeObject(cluster, "cluster")
if err != nil {
Expand All @@ -91,11 +109,31 @@ func createMajorUpgradeSgDbOps(svc *runtime.ServiceRuntime, comp *vshnv1.VSHNPos
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot get observed kube object postgres config: %w", err))

Check failure on line 110 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / test

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 110 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-build

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 110 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-lint

fmt.Sprintf does not support error-wrapping directive %w
}
conf.SetName(conf.GetName() + "-" + expectedV)
conf.Spec.PostgresVersion = expectedV
err = svc.SetDesiredKubeObject(conf, comp.GetName()+"-"+configResourceName)
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot set observed kube object postgres config %s", comp.GetName()))
}
sgNamespace := svc.Config.Data["sgNamespace"]
username, password, err := getCredentials(ctx, sgNamespace)
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot get stackgres rest api credentials: %w", err))

Check failure on line 121 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / test

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 121 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-build

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 121 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-lint

fmt.Sprintf does not support error-wrapping directive %w
}
stackgresClient, err := stackgres.New(username, password, sgNamespace)
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot initialize stackgres client: %w", err))

Check failure on line 125 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / test

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 125 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-build

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 125 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-lint

fmt.Sprintf does not support error-wrapping directive %w
}

vList, err := stackgresClient.GetAvailableVersions()
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot get postgres version list: %w", err))

Check failure on line 130 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / test

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 130 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-build

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 130 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-lint

fmt.Sprintf does not support error-wrapping directive %w
}

majorMinorVersion, err := stackgres.GetLatestMinorVersion(expectedV, vList)
if err != nil {
return runtime.NewWarningResult(fmt.Sprintf("cannot get latest minor version: %w", err))

Check failure on line 135 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / test

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 135 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-build

fmt.Sprintf does not support error-wrapping directive %w

Check failure on line 135 in pkg/comp-functions/functions/vshnpostgres/major_version_upgrade.go

View workflow job for this annotation

GitHub Actions / go-lint

fmt.Sprintf does not support error-wrapping directive %w
}

sgdbops := &stackgresv1.SGDbOps{
ObjectMeta: v1.ObjectMeta{
Expand All @@ -104,10 +142,8 @@ func createMajorUpgradeSgDbOps(svc *runtime.ServiceRuntime, comp *vshnv1.VSHNPos
},
Spec: stackgresv1.SGDbOpsSpec{
MajorVersionUpgrade: &stackgresv1.SGDbOpsSpecMajorVersionUpgrade{
Check: pointer.To(true),
Clone: nil,
Link: pointer.To(true),
PostgresVersion: &expectedV,
PostgresVersion: &majorMinorVersion,
SgPostgresConfig: pointer.To(conf.GetName()),
},
Op: "majorVersionUpgrade",
Expand All @@ -120,3 +156,32 @@ func createMajorUpgradeSgDbOps(svc *runtime.ServiceRuntime, comp *vshnv1.VSHNPos
}
return runtime.NewNormalResult("SGDBOps for major upgrade created")
}

func getCredentials(ctx context.Context, sgNamespace string) (username, password string, err error) {
kubeClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{
Scheme: pkg.SetupScheme(),
})
if err != nil {
return "", "", err
}

c := &v2.Secret{}
err = kubeClient.Get(ctx, types.NamespacedName{
Namespace: sgNamespace,
Name: "stackgres-restapi-admin",
}, c)
if err != nil {
return "", "", err
}

u, err := base64.StdEncoding.DecodeString(string(c.Data["k8sUsername"]))
if err != nil {
return "", "", fmt.Errorf("cannot decode username: %w", err)
}
p, err := base64.StdEncoding.DecodeString(string(c.Data["password"]))
if err != nil {
return "", "", fmt.Errorf("cannot decode password: %w", err)
}

return string(u), string(p), nil
}
29 changes: 19 additions & 10 deletions pkg/controller/webhooks/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (p *PostgreSQLWebhookHandler) validatePostgreSQL(ctx context.Context, newOb
}

// Validate major upgrades
if err := validateMajorVersionUpgrade(newPg, oldPg); err != nil {
allErrs = append(allErrs, err)
if errList := validateMajorVersionUpgrade(newPg, oldPg); errList != nil {
allErrs = append(allErrs, errList...)
}
}

Expand Down Expand Up @@ -276,37 +276,46 @@ func validatePgConf(pg *vshnv1.VSHNPostgreSQL) field.ErrorList {
return allErrs
}

func validateMajorVersionUpgrade(newPg *vshnv1.VSHNPostgreSQL, oldPg *vshnv1.VSHNPostgreSQL) *field.Error {
func validateMajorVersionUpgrade(newPg *vshnv1.VSHNPostgreSQL, oldPg *vshnv1.VSHNPostgreSQL) (errList field.ErrorList) {

newVersion, err := strconv.Atoi(newPg.Spec.Parameters.Service.MajorVersion)
if err != nil {
return field.Invalid(
errList = append(errList, field.Invalid(
field.NewPath("spec.parameters.service.majorVersion"),
newPg.Spec.Parameters.Service.MajorVersion,
fmt.Sprintf("invalid major version: %s", err.Error()),
)
))
}
var oldVersion int
if oldPg.Status.MajorVersion == "" {
oldVersion = newVersion
} else {
oldVersion, err = strconv.Atoi(oldPg.Status.MajorVersion)
if err != nil {
return field.Invalid(
errList = append(errList, field.Invalid(
field.NewPath("status.majorVersion"),
oldPg.Status.MajorVersion,
fmt.Sprintf("invalid major version: %s", err.Error()),
)
))
}
}

// Check if the upgrade is allowed
if newVersion != oldVersion {
if oldVersion != newVersion-1 {
return field.Forbidden(
errList = append(errList, field.Forbidden(
field.NewPath("spec.parameters.service.majorVersion"),
"only one major version upgrade at a time is allowed",
)
))
}
for _, e := range oldPg.Spec.Parameters.Service.Extensions {
if e.Name == "timescaledb" || e.Name == "postgis" {
errList = append(errList, field.Forbidden(
field.NewPath("spec.parameters.service.majorVersion"),
"major upgrades are not supported for instances with timescaledb or postgis extensions",
))
}
}
}
return nil
return errList
}
Loading

0 comments on commit dfdf9d7

Please sign in to comment.