Skip to content

Commit

Permalink
chore: add email receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Nov 17, 2024
1 parent a0a7c7c commit de30620
Show file tree
Hide file tree
Showing 21 changed files with 495 additions and 34 deletions.
5 changes: 3 additions & 2 deletions apiclient/types/emailreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package types
type EmailReceiver struct {
Metadata
EmailReceiverManifest
AliasAssigned bool `json:"aliasAssigned,omitempty"`
AddressAssigned bool `json:"aliasAssigned,omitempty"`
EmailAddress string `json:"emailAddress,omitempty"`
}

type EmailReceiverManifest struct {
Name string `json:"name"`
Description string `json:"description"`
Alias string `json:"alias"`
User string `json:"user,omitempty"`
Workflow string `json:"workflow"`
AllowedSenders []string `json:"allowedSenders,omitempty"`
}
Expand Down
18 changes: 10 additions & 8 deletions apiclient/types/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ func (in WorkflowState) IsTerminal() bool {
type Thread struct {
Metadata
ThreadManifest
AgentID string `json:"agentID,omitempty"`
WorkflowID string `json:"workflowID,omitempty"`
State string `json:"state,omitempty"`
LastRunID string `json:"lastRunID,omitempty"`
CurrentRunID string `json:"currentRunID,omitempty"`
ParentThreadID string `json:"parentThreadID,omitempty"`
UserID string `json:"userID,omitempty"`
AgentAlias string `json:"agentAlias,omitempty"`
AgentID string `json:"agentID,omitempty"`
WorkflowID string `json:"workflowID,omitempty"`
WebhookID string `json:"webhookID,omitempty"`
EmailReceiverID string `json:"emailReceiverID,omitempty"`
State string `json:"state,omitempty"`
LastRunID string `json:"lastRunID,omitempty"`
CurrentRunID string `json:"currentRunID,omitempty"`
ParentThreadID string `json:"parentThreadID,omitempty"`
UserID string `json:"userID,omitempty"`
AgentAlias string `json:"agentAlias,omitempty"`
}

type ThreadList List[Thread]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ require (
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mbland/hmacauth v0.0.0-20170912233209-44256dfd4bfa // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mhale/smtpd v0.8.3 // indirect
github.com/mholt/archiver/v4 v4.0.0-alpha.8 // indirect
github.com/microcosm-cc/bluemonday v1.0.27 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ github.com/mbland/hmacauth v0.0.0-20170912233209-44256dfd4bfa/go.mod h1:8vxFeeg+
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mhale/smtpd v0.8.3 h1:8j8YNXajksoSLZja3HdwvYVZPuJSqAxFsib3adzRRt8=
github.com/mhale/smtpd v0.8.3/go.mod h1:MQl+y2hwIEQCXtNhe5+55n0GZOjSmeqORDIXbqUL3x4=
github.com/mholt/archiver/v4 v4.0.0-alpha.8 h1:tRGQuDVPh66WCOelqe6LIGh0gwmfwxUrSSDunscGsRM=
github.com/mholt/archiver/v4 v4.0.0-alpha.8/go.mod h1:5f7FUYGXdJWUjESffJaYR4R60VhnHxb2X3T1teMyv5A=
github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/alias/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Get(ctx context.Context, c kclient.Client, obj v1.Aliasable, namespace stri
}

var alias v1.Alias
if err := c.Get(ctx, router.Key(namespace, Key(gvk, obj, name)), &alias); apierrors.IsNotFound(err) {
if err := c.Get(ctx, router.Key("", Key(gvk, obj, name)), &alias); apierrors.IsNotFound(err) {
return errLookup
} else if err != nil {
return errors.Join(errLookup, err)
Expand Down
123 changes: 123 additions & 0 deletions pkg/api/handlers/emailreceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package handlers

import (
"net/http"

"github.com/otto8-ai/otto8/apiclient/types"
"github.com/otto8-ai/otto8/pkg/alias"
"github.com/otto8-ai/otto8/pkg/api"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/otto8-ai/otto8/pkg/system"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type EmailReceiverHandler struct {
hostname string
}

func NewEmailReceiverHandler(hostname string) *EmailReceiverHandler {
return &EmailReceiverHandler{
hostname: hostname,
}
}

func (e *EmailReceiverHandler) Update(req api.Context) error {
var (
id = req.PathValue("id")
er v1.EmailReceiver
)

if err := alias.Get(req.Context(), req.Storage, &er, req.Namespace(), id); err != nil {
return err
}

var manifest types.EmailReceiverManifest
if err := req.Read(&manifest); err != nil {
return err
}

er.Spec.EmailReceiverManifest = manifest

if err := req.Update(&er); err != nil {
return err
}

return req.Write(convertEmailReceiver(er, e.hostname))
}

func (e *EmailReceiverHandler) Delete(req api.Context) error {
var (
id = req.PathValue("id")
)

return req.Delete(&v1.EmailReceiver{
ObjectMeta: metav1.ObjectMeta{
Name: id,
Namespace: req.Namespace(),
},
})
}

func (e *EmailReceiverHandler) Create(req api.Context) error {
var manifest types.EmailReceiverManifest
if err := req.Read(&manifest); err != nil {
return err
}

er := v1.EmailReceiver{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.EmailReceiverPrefix,
Namespace: req.Namespace(),
},
Spec: v1.EmailReceiverSpec{
EmailReceiverManifest: manifest,
},
}

if err := req.Create(&er); err != nil {
return err
}

req.WriteHeader(http.StatusCreated)
return req.Write(convertEmailReceiver(er, e.hostname))
}

func convertEmailReceiver(emailReceiver v1.EmailReceiver, hostname string) *types.EmailReceiver {
manifest := emailReceiver.Spec.EmailReceiverManifest
er := &types.EmailReceiver{
Metadata: MetadataFrom(&emailReceiver),
EmailReceiverManifest: manifest,
AddressAssigned: emailReceiver.Status.AliasAssigned,
}
if hostname != "" && er.AddressAssigned {
er.EmailAddress = emailReceiver.Spec.User + "@" + hostname
}
return er
}

func (e *EmailReceiverHandler) ByID(req api.Context) error {
var (
er v1.EmailReceiver
id = req.PathValue("id")
)

if err := alias.Get(req.Context(), req.Storage, &er, req.Namespace(), id); err != nil {
return err
}

return req.Write(convertEmailReceiver(er, e.hostname))
}

func (e *EmailReceiverHandler) List(req api.Context) error {
var emailReceiverList v1.EmailReceiverList
if err := req.List(&emailReceiverList); err != nil {
return err
}

var resp types.EmailReceiverList
for _, er := range emailReceiverList.Items {
resp.Items = append(resp.Items, *convertEmailReceiver(er, e.hostname))
}

return req.Write(resp)
}
18 changes: 10 additions & 8 deletions pkg/api/handlers/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ func convertThread(thread v1.Thread) types.Thread {
Description: thread.Spec.Manifest.Description,
Tools: thread.Spec.Manifest.Tools,
},
AgentID: thread.Spec.AgentName,
WorkflowID: thread.Spec.WorkflowName,
LastRunID: thread.Status.LastRunName,
CurrentRunID: thread.Status.CurrentRunName,
State: state,
ParentThreadID: parent,
AgentAlias: thread.Spec.AgentAlias,
UserID: thread.Spec.UserUID,
AgentID: thread.Spec.AgentName,
WorkflowID: thread.Spec.WorkflowName,
WebhookID: thread.Spec.WebhookName,
EmailReceiverID: thread.Spec.EmailReceiverName,
LastRunID: thread.Status.LastRunName,
CurrentRunID: thread.Status.CurrentRunName,
State: state,
ParentThreadID: parent,
AgentAlias: thread.Spec.AgentAlias,
UserID: thread.Spec.UserUID,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *WebhookHandler) List(req api.Context) error {

func (a *WebhookHandler) Execute(req api.Context) error {
var webhook v1.Webhook
if err := alias.Get(req.Context(), req.Storage, &webhook, req.Namespace(), req.PathValue("id")); err != nil {
if err := alias.Get(req.Context(), req.Storage, &webhook, "", req.PathValue("id")); err != nil {
return err
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func Router(services *services.Services) (http.Handler, error) {
cronJobs := handlers.NewCronJobHandler()
models := handlers.NewModelHandler()
prompt := handlers.NewPromptHandler(services.GPTClient)
emailreceiver := handlers.NewEmailReceiverHandler(services.EmailServerName)

// Version
mux.HandleFunc("GET /api/version", handlers.GetVersion)
Expand Down Expand Up @@ -172,6 +173,17 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("PUT /api/webhooks/{id}", webhooks.Update)
mux.HandleFunc("POST /api/webhooks/{id}", webhooks.Execute)

// Email Receivers
mux.HandleFunc("POST /api/email-receivers", emailreceiver.Create)
mux.HandleFunc("GET /api/email-receivers", emailreceiver.List)
mux.HandleFunc("GET /api/email-receivers/{id}", emailreceiver.ByID)
mux.HandleFunc("DELETE /api/email-receivers/{id}", emailreceiver.Delete)
mux.HandleFunc("PUT /api/email-receivers/{id}", emailreceiver.Update)

// Email Receivers for generic create
mux.HandleFunc("POST /api/emailreceivers", emailreceiver.Create)
mux.HandleFunc("GET /api/emailreceivers/{id}", emailreceiver.ByID)

// CronJobs
mux.HandleFunc("POST /api/cronjobs", cronJobs.Create)
mux.HandleFunc("GET /api/cronjobs", cronJobs.List)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ func (l *Create) fromTemplate(ctx context.Context) (string, error) {
"Agent",
"Workflow",
"Webhook",
"Email Receiver",
}).Show()
if err != nil {
return "", err
}

template, err := templates.FS.ReadFile(strings.ToLower(sel) + ".yaml")
template, err := templates.FS.ReadFile(strings.ToLower(strings.ReplaceAll(sel, " ", "") + ".yaml"))
if err != nil {
return "", err
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/cli/emailreceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cli

import (
"fmt"

"github.com/dustin/go-humanize"
"github.com/spf13/cobra"
)

type EmailReceivers struct {
root *Otto8
Quiet bool `usage:"Only print IDs of agents" short:"q"`
Wide bool `usage:"Print more information" short:"w"`
Output string `usage:"Output format (table, json, yaml)" short:"o" default:"table"`
}

func (l *EmailReceivers) Customize(cmd *cobra.Command) {
cmd.Aliases = []string{"emailreceiver", "er"}
}

func (l *EmailReceivers) Run(cmd *cobra.Command, _ []string) error {
ers, err := l.root.Client.ListEmailReceivers(cmd.Context())
if err != nil {
return err
}

if ok, err := output(l.Output, ers); ok || err != nil {
return err
}

if l.Quiet {
for _, emailReceiver := range ers.Items {
fmt.Println(emailReceiver.ID)
}
return nil
}

w := newTable("ID", "NAME", "DESCRIPTION", "WORKFLOW", "ADDRESS", "CREATED")
for _, er := range ers.Items {
w.WriteRow(er.ID, er.Name, truncate(er.Description, l.Wide), er.Workflow,
er.EmailAddress,
humanize.Time(er.Created.Time))
}

return w.Err()
}
6 changes: 6 additions & 0 deletions pkg/cli/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func (l *Delete) Run(cmd *cobra.Command, args []string) error {
} else {
fmt.Printf("Webhook deleted: %s\n", id)
}
case system.IsEmailReceiverID(id):
if err := l.root.Client.DeleteEmailReceiver(cmd.Context(), id); err != nil {
errs = append(errs, err)
} else {
fmt.Printf("Email receiver deleted: %s\n", id)
}
default:
errs = append(errs, errors.New("invalid ID: "+id))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/cli/templates/emailreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Required to be set to webhook
type: emailreceiver

# A display name for you webhook
name: "%NAME%"

# A description of your webhook
description: ""

# Required the user portion of the [email protected] email address that will be created
user: ""

# The workflow ID or workflow alias that this email receiver will trigger
workflow: ""
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (h *Handler) newThread(ctx context.Context, c kclient.Client, wf *v1.Workfl
WorkflowName: we.Spec.WorkflowName,
WorkflowExecutionName: we.Name,
WebhookName: we.Spec.WebhookName,
EmailReceiverName: we.Spec.EmailReceiverName,
CronJobName: we.Spec.CronJobName,
FromWorkspaceNames: []string{workspaceName},
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/services/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/otto8-ai/otto8/pkg/invoke"
"github.com/otto8-ai/otto8/pkg/jwt"
"github.com/otto8-ai/otto8/pkg/proxy"
"github.com/otto8-ai/otto8/pkg/smtp"
"github.com/otto8-ai/otto8/pkg/storage"
"github.com/otto8-ai/otto8/pkg/storage/scheme"
"github.com/otto8-ai/otto8/pkg/storage/services"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Config struct {
AWSKMSKeyARN string `usage:"The ARN of the AWS KMS key to use for encrypting credential storage" env:"OTTO8_AWS_KMS_KEY_ARN" name:"aws-kms-key-arn"`
EncryptionConfigFile string `usage:"The path to the encryption configuration file" default:"./encryption.yaml"`
KnowledgeSetIngestionLimit int `usage:"The maximum number of files to ingest into a knowledge set" default:"1000" env:"OTTO_KNOWLEDGESET_INGESTION_LIMIT" name:"knowledge-set-ingestion-limit"`
EmailServerName string `usage:"The name of the email server to display for email receivers (default: ui-hostname value)"`

AuthConfig
GatewayConfig
Expand All @@ -70,6 +72,7 @@ type Services struct {
ToolRegistryURL string
WorkspaceProviderType string
ServerURL string
EmailServerName string
DevUIPort int
Events *events.Emitter
StorageClient storage.Client
Expand Down Expand Up @@ -258,6 +261,10 @@ func New(ctx context.Context, config Config) (*Services, error) {
authenticators = union.New(authenticators, authn.NoAuth{})
}

if config.EmailServerName != "" {
go smtp.Start(ctx, storageClient, config.EmailServerName)
}

// For now, always auto-migrate the gateway database
return &Services{
WorkspaceProviderType: config.WorkspaceProviderType,
Expand All @@ -275,6 +282,7 @@ func New(ctx context.Context, config Config) (*Services, error) {
GatewayServer: gatewayServer,
ProxyServer: proxyServer,
KnowledgeSetIngestionLimit: config.KnowledgeSetIngestionLimit,
EmailServerName: config.EmailServerName,
}, nil
}

Expand Down
Loading

0 comments on commit de30620

Please sign in to comment.