-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor grpc-server package #598
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package internal | ||
|
||
import ( | ||
"database/sql" | ||
|
||
"github.com/packethost/pkg/log" | ||
"github.com/tinkerbell/tink/db" | ||
) | ||
|
||
// SetupPostgres initializes a connection to a postgres database. | ||
func SetupPostgres(connInfo string, onlyMigrate bool, logger log.Logger) (db.Database, error) { | ||
dbCon, err := sql.Open("postgres", connInfo) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tinkDB := db.Connect(dbCon, logger) | ||
|
||
if onlyMigrate { | ||
logger.Info("Applying migrations. This process will end when migrations will take place.") | ||
numAppliedMigrations, err := tinkDB.Migrate() | ||
if err != nil { | ||
return nil, err | ||
} | ||
logger.With("num_applied_migrations", numAppliedMigrations).Info("Migrations applied successfully") | ||
return nil, nil | ||
} | ||
|
||
numAvailableMigrations, err := tinkDB.CheckRequiredMigrations() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if numAvailableMigrations != 0 { | ||
logger.Info("Your database schema is not up to date. Please apply migrations running tink-server with env var ONLY_MIGRATION set.") | ||
} | ||
return *tinkDB, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,148 +7,87 @@ import ( | |
"net" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | ||
"github.com/packethost/pkg/log" | ||
"github.com/pkg/errors" | ||
"github.com/tinkerbell/tink/db" | ||
"github.com/tinkerbell/tink/protos/hardware" | ||
"github.com/tinkerbell/tink/protos/template" | ||
"github.com/tinkerbell/tink/protos/workflow" | ||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/reflection" | ||
) | ||
|
||
// Server is the gRPC server for tinkerbell. | ||
type server struct { | ||
cert []byte | ||
modT time.Time | ||
// GetCerts returns a TLS certificate, PEM bytes, and file modification time for a | ||
// given path. An error is returned for any failure. | ||
// | ||
// The public key is expected to be named "bundle.pem" and the private key | ||
// "server.pem". | ||
func GetCerts(certsDir string) (*tls.Certificate, []byte, *time.Time, error) { | ||
certFile, err := os.Open(filepath.Join(certsDir, "bundle.pem")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. G304: Potential file inclusion via variable |
||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to open TLS cert") | ||
} | ||
|
||
db db.Database | ||
quit <-chan struct{} | ||
stat, err := certFile.Stat() | ||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to stat TLS cert") | ||
} | ||
modT := stat.ModTime() | ||
certPEM, err := ioutil.ReadAll(certFile) | ||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to read TLS cert") | ||
} | ||
err = certFile.Close() | ||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to close TLS cert") | ||
} | ||
|
||
dbLock sync.RWMutex | ||
dbReady bool | ||
keyPEM, err := ioutil.ReadFile(filepath.Join(certsDir, "server-key.pem")) | ||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to read TLS key") | ||
} | ||
|
||
watchLock sync.RWMutex | ||
watch map[string]chan string | ||
cert, err := tls.X509KeyPair(certPEM, keyPEM) | ||
if err != nil { | ||
return nil, nil, nil, errors.Wrap(err, "failed to parse TLS file content") | ||
} | ||
|
||
logger log.Logger | ||
return &cert, certPEM, &modT, nil | ||
} | ||
|
||
type ConfigGRPCServer struct { | ||
micahhausler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Facility string | ||
TLSCert string | ||
GRPCAuthority string | ||
DB db.Database | ||
// Registrar is an interface for registering APIs on a gRPC server. | ||
type Registrar interface { | ||
Register(*grpc.Server) | ||
} | ||
|
||
// SetupGRPC setup and return a gRPC server. | ||
func SetupGRPC(ctx context.Context, logger log.Logger, config *ConfigGRPCServer, errCh chan<- error) ([]byte, time.Time) { | ||
// SetupGRPC opens a listener and serves a given Registrar's APIs on a gRPC server | ||
// and returns the listener's address or an error. | ||
func SetupGRPC(ctx context.Context, r Registrar, listenAddr string, opts []grpc.ServerOption, errCh chan<- error) (serverAddr string, err error) { | ||
params := []grpc.ServerOption{ | ||
grpc_middleware.WithUnaryServerChain(grpc_prometheus.UnaryServerInterceptor, otelgrpc.UnaryServerInterceptor()), | ||
grpc_middleware.WithStreamServerChain(grpc_prometheus.StreamServerInterceptor, otelgrpc.StreamServerInterceptor()), | ||
} | ||
server := &server{ | ||
db: config.DB, | ||
dbReady: true, | ||
logger: logger, | ||
} | ||
cert := config.TLSCert | ||
switch cert { | ||
case "insecure": | ||
// server.cert *must* be nil, which it is because that is the default value | ||
// server.modT doesn't matter | ||
case "": | ||
tlsCert, certPEM, modT := getCerts(config.Facility, logger) | ||
params = append(params, grpc.Creds(credentials.NewServerTLSFromCert(&tlsCert))) | ||
server.cert = certPEM | ||
server.modT = modT | ||
default: | ||
server.cert = []byte(cert) | ||
server.modT = time.Now() | ||
} | ||
params = append(params, opts...) | ||
|
||
// register servers | ||
s := grpc.NewServer(params...) | ||
template.RegisterTemplateServiceServer(s, server) | ||
workflow.RegisterWorkflowServiceServer(s, server) | ||
hardware.RegisterHardwareServiceServer(s, server) | ||
r.Register(s) | ||
reflection.Register(s) | ||
|
||
grpc_prometheus.Register(s) | ||
|
||
go func() { | ||
lis, err := net.Listen("tcp", config.GRPCAuthority) | ||
if err != nil { | ||
err = errors.Wrap(err, "failed to listen") | ||
logger.Error(err) | ||
panic(err) | ||
} | ||
|
||
errCh <- s.Serve(lis) | ||
}() | ||
|
||
go func() { | ||
<-ctx.Done() | ||
s.GracefulStop() | ||
}() | ||
return server.cert, server.modT | ||
} | ||
|
||
func getCerts(facility string, logger log.Logger) (tls.Certificate, []byte, time.Time) { | ||
var ( | ||
certPEM []byte | ||
modT time.Time | ||
) | ||
|
||
certsDir := os.Getenv("TINKERBELL_CERTS_DIR") | ||
if certsDir == "" { | ||
certsDir = "/certs/" + facility | ||
} | ||
if !strings.HasSuffix(certsDir, "/") { | ||
certsDir += "/" | ||
} | ||
|
||
certFile, err := os.Open(filepath.Clean(certsDir + "bundle.pem")) | ||
lis, err := net.Listen("tcp", listenAddr) | ||
if err != nil { | ||
err = errors.Wrap(err, "failed to open TLS cert") | ||
logger.Error(err) | ||
panic(err) | ||
return "", errors.Wrap(err, "failed to listen") | ||
} | ||
|
||
if stat, err := certFile.Stat(); err != nil { | ||
err = errors.Wrap(err, "failed to stat TLS cert") | ||
logger.Error(err) | ||
panic(err) | ||
} else { | ||
modT = stat.ModTime() | ||
} | ||
go func(errChan chan<- error) { | ||
errChan <- s.Serve(lis) | ||
}(errCh) | ||
|
||
certPEM, err = ioutil.ReadAll(certFile) | ||
if err != nil { | ||
err = errors.Wrap(err, "failed to read TLS cert") | ||
logger.Error(err) | ||
panic(err) | ||
} | ||
keyPEM, err := ioutil.ReadFile(filepath.Clean(certsDir + "server-key.pem")) | ||
if err != nil { | ||
err = errors.Wrap(err, "failed to read TLS key") | ||
logger.Error(err) | ||
panic(err) | ||
} | ||
go func(ctx context.Context, s *grpc.Server) { | ||
<-ctx.Done() | ||
s.GracefulStop() | ||
}(ctx, s) | ||
|
||
cert, err := tls.X509KeyPair(certPEM, keyPEM) | ||
if err != nil { | ||
err = errors.Wrap(err, "failed to ingest TLS files") | ||
logger.Error(err) | ||
panic(err) | ||
} | ||
return cert, certPEM, modT | ||
return lis.Addr().String(), nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could really benefit from using https://github.com/oklog/run. Unnecessary for this PR, but if you get time separately it would help clean up the subsequent error channel code that does static draining of the channel.