Skip to content

Commit

Permalink
Add Kubernetes Resource Model to Tink API
Browse files Browse the repository at this point in the history
Signed-off-by: Micah Hausler <[email protected]>
  • Loading branch information
micahhausler committed Mar 3, 2022
1 parent 2326b77 commit a82ae63
Show file tree
Hide file tree
Showing 9 changed files with 1,021 additions and 86 deletions.
105 changes: 65 additions & 40 deletions cmd/tink-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ var version = "devel"
// DaemonConfig represents all the values you can configure as part of the tink-server.
// You can change the configuration via environment variable, or file, or command flags.
type DaemonConfig struct {
Facility string
PGDatabase string
PGUSer string
PGPassword string
PGSSLMode string
OnlyMigration bool
GRPCAuthority string
TLSCert string
CertDir string
HTTPAuthority string
TLS bool
Facility string
PGDatabase string
PGUSer string
PGPassword string
PGSSLMode string
OnlyMigration bool
GRPCAuthority string
TLSCert string
CertDir string
HTTPAuthority string
TLS bool
KubernetesResourceModel bool
KubeconfigPath string
}

func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -52,6 +54,8 @@ func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.CertDir, "cert-dir", "", "")
fs.StringVar(&c.HTTPAuthority, "http-authority", ":42114", "The address used to expose the HTTP server")
fs.BoolVar(&c.TLS, "tls", true, "Run in tls protected mode (disabling should only be done for development or if behind TLS terminating proxy)")
fs.BoolVar(&c.KubernetesResourceModel, "kubernetes-backend", false, "Feature flag to use the Kubernetes Resource Model")
fs.StringVar(&c.KubeconfigPath, "kubeconfig", "", "The path to the Kubeconfig. Only takes effect if `--kubernetes-backend=true`")
}

func (c *DaemonConfig) PopulateFromLegacyEnvVar() {
Expand Down Expand Up @@ -120,46 +124,23 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
// figure this out in another PR
errCh := make(chan error, 2)

// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
)

dbCon, err := sql.Open("postgres", connInfo)
var dbSetup dbSetupFunc = setupPostgres
if config.KubernetesResourceModel {
dbSetup = setupKubernetes
}
database, err := dbSetup(config, logger)
if err != nil {
return err
}
tinkDB := db.Connect(dbCon, logger)

if config.OnlyMigration {
logger.Info("Applying migrations. This process will end when migrations will take place.")
numAppliedMigrations, err := tinkDB.Migrate()
if err != nil {
return err
}
logger.With("num_applied_migrations", numAppliedMigrations).Info("Migrations applied successfully")
return nil
}

numAvailableMigrations, err := tinkDB.CheckRequiredMigrations()
if err != nil {
return err
}
if numAvailableMigrations != 0 {
logger.Info("Your database schema is not up to date. Please apply migrations running tink-server with env var ONLY_MIGRATION set.")
}

grpcConfig := &grpcServer.ConfigGRPCServer{
Facility: config.Facility,
TLSCert: "insecure",
GRPCAuthority: config.GRPCAuthority,
DB: tinkDB,
DB: database,
}
if config.TLS {
grpcConfig.TLSCert = config.TLSCert
Expand Down Expand Up @@ -197,6 +178,50 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
return cmd
}

type dbSetupFunc func(*DaemonConfig, log.Logger) (db.Database, error)

func setupPostgres(config *DaemonConfig, logger log.Logger) (db.Database, error) {
// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
)

dbCon, err := sql.Open("postgres", connInfo)
if err != nil {
return nil, err
}
tinkDB := db.Connect(dbCon, logger)

if config.OnlyMigration {
logger.Info("Applying migrations. This process will end when migrations will take place.")
numAppliedMigrations, err := tinkDB.Migrate()
if err != nil {
return nil, err
}
logger.With("num_applied_migrations", numAppliedMigrations).Info("Migrations applied successfully")
return nil, nil
}

numAvailableMigrations, err := tinkDB.CheckRequiredMigrations()
if err != nil {
return nil, err
}
if numAvailableMigrations != 0 {
logger.Info("Your database schema is not up to date. Please apply migrations running tink-server with env var ONLY_MIGRATION set.")
}
return *tinkDB, nil
}

func setupKubernetes(config *DaemonConfig, logger log.Logger) (db.Database, error) {
return db.NewK8sDatabase(config.KubeconfigPath, logger)
}

func createViper(logger log.Logger) (*viper.Viper, error) {
v := viper.New()
v.AutomaticEnv()
Expand Down
211 changes: 211 additions & 0 deletions db/k8s_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package db

import (
"context"
"errors"
"fmt"
"time"

"github.com/packethost/pkg/log"
"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"
"github.com/tinkerbell/tink/pkg/controllers"
"github.com/tinkerbell/tink/pkg/convert"
pb "github.com/tinkerbell/tink/protos/workflow"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type k8sDB struct {
Database // shadow the DB interface but leave unimplemented

logger log.Logger
clientFunc func() client.Client

nowFunc func() time.Time
}

func NewK8sDatabase(kubeconfig string, logger log.Logger) (Database, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
manager := controllers.NewManagerOrDie(config, controllers.GetServerOptions())
go func() {
err := manager.Start(context.Background())
if err != nil {
logger.Error(err, "Error starting manager")
}
}()
return &k8sDB{
logger: logger,
clientFunc: manager.GetClient,
}, nil
}

// InsertIntoWfDataTable is deprecated, but still callable. Returning a nil error.
func (d *k8sDB) InsertIntoWfDataTable(_ context.Context, _ *pb.UpdateWorkflowDataRequest) error {
return nil
}

// GetfromWfDataTable is deprecated, but still callable. returning an emtyp byte slice and error.
func (d *k8sDB) GetfromWfDataTable(_ context.Context, _ *pb.GetWorkflowDataRequest) ([]byte, error) {
return []byte("{}"), nil
}

func (d *k8sDB) GetWorkflowsForWorker(ctx context.Context, id string) ([]string, error) {
stored := &v1alpha1.WorkflowList{}
err := d.clientFunc().List(ctx, stored, &client.MatchingFields{
controllers.WorkerAddr: id,
})
if err != nil {
return nil, err
}
wfNames := []string{}
for _, wf := range stored.Items {
// workaround for bug in controller runtime fake client: it reports an empty object without a name
if len(wf.Name) > 0 {
wfNames = append(wfNames, wf.Name)
}
}

return wfNames, nil
}

func (d *k8sDB) getWorkflowByName(ctx context.Context, id string) (*v1alpha1.Workflow, error) {
workflow := &v1alpha1.Workflow{}
err := d.clientFunc().Get(ctx, types.NamespacedName{Name: id, Namespace: "default"}, workflow)
if err != nil {
d.logger.Error(err, "could not find workflow named ", id)
return nil, err
}
return workflow, nil
}

// UpdateWorkflowState processes an workflow change from a worker.
func (d *k8sDB) UpdateWorkflowState(ctx context.Context, wfContext *pb.WorkflowContext) error {
wf, err := d.getWorkflowByName(ctx, wfContext.WorkflowId)
if err != nil {
return err
}
stored := wf.DeepCopy()
err = d.updateWorkflowState(wf, wfContext)
if err != nil {
return err
}
return d.clientFunc().Status().Patch(ctx, wf, client.MergeFrom(stored))
}

func (d *k8sDB) updateWorkflowState(wf *v1alpha1.Workflow, wfContext *pb.WorkflowContext) error {
if wf == nil {
return errors.New("no workflow provided")
}
if wfContext == nil {
return errors.New("no workflow context provided")
}
var (
taskIndex int = -1
actionIndex int = -1
)

for ti, task := range wf.Status.Tasks {
if wfContext.CurrentTask == task.Name {
for ai, action := range task.Actions {
if action.Name == wfContext.CurrentAction && wfContext.CurrentActionIndex == int64(ai) {
taskIndex = ti
actionIndex = ai
goto cont
}
}
}
}
cont:

if taskIndex < 0 {
return errors.New("task not found")
}
if actionIndex < 0 {
return errors.New("action not found")
}

d.logger.Info(fmt.Sprintf("Updating taskIndex %d action index %d with value: %#v ", taskIndex, actionIndex, wf.Status.Tasks[taskIndex].Actions[actionIndex]))
wf.Status.Tasks[taskIndex].Actions[actionIndex].Status = pb.State_name[int32(wfContext.CurrentActionState)]

switch wfContext.CurrentActionState {
case pb.State_STATE_RUNNING:
// Workflow is running, so set the start time to now
wf.Status.State = pb.State_name[int32(wfContext.CurrentActionState)]
wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time {
t := metav1.NewTime(d.nowFunc())
return &t
}()
case pb.State_STATE_FAILED:
case pb.State_STATE_TIMEOUT:
// Handle terminal statuses by updating the workflow state and time
wf.Status.State = pb.State_name[int32(wfContext.CurrentActionState)]
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(d.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
case pb.State_STATE_SUCCESS:
// Handle a success by marking the task as complete
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(d.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
// Mark success on last action success
if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions {
wf.Status.State = pb.State_name[int32(wfContext.CurrentActionState)]
}
case pb.State_STATE_PENDING:
// This is probably a client bug?
return errors.New("no update requested")
}
return nil
}

// GetWorkflowContexts returns the WorkflowContexts for a given workflow.
func (d *k8sDB) GetWorkflowContexts(ctx context.Context, wfID string) (*pb.WorkflowContext, error) {
wf, err := d.getWorkflowByName(ctx, wfID)
if err != nil {
return nil, err
}

var (
found bool
taskIndex int
taskActionIndex int
actionIndex int
actionCount int
)
for ti, task := range wf.Status.Tasks {
for ai, action := range task.Actions {
actionCount++
// Find the first action in a non-terminal state
if (action.Status == pb.State_name[int32(pb.State_STATE_PENDING)] || action.Status == pb.State_name[int32(pb.State_STATE_RUNNING)]) && !found {
taskIndex = ti
actionIndex = ai
found = true
}
if !found {
actionIndex++
}
}
}

resp := &pb.WorkflowContext{
WorkflowId: wfID,
CurrentWorker: wf.Status.Tasks[taskIndex].WorkerAddr,
CurrentTask: wf.Status.Tasks[taskIndex].Name,
CurrentAction: wf.Status.Tasks[taskIndex].Actions[taskActionIndex].Name,
CurrentActionIndex: int64(actionIndex),
CurrentActionState: pb.State(pb.State_value[wf.Status.Tasks[taskIndex].Actions[taskActionIndex].Status]),
TotalNumberOfActions: int64(actionCount),
}
return resp, nil
}

func (d *k8sDB) GetWorkflowActions(ctx context.Context, wfID string) (*pb.WorkflowActionList, error) {
wf, err := d.getWorkflowByName(ctx, wfID)
if err != nil {
return nil, err
}
return convert.WorkflowActionListCRDToProto(wf), nil
}
Loading

0 comments on commit a82ae63

Please sign in to comment.