Skip to content

Commit

Permalink
Filter jobexecutions and crawlexecutions on start ime (to and from)
Browse files Browse the repository at this point in the history
 - Remove log.Fatal calls to gracefully close connections
  • Loading branch information
maeb committed Mar 31, 2022
1 parent 890095e commit e5dd1c2
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 244 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ require (
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.0.0-20200921180117-858c6e7e6b7e // indirect
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
github.com/sirupsen/logrus v1.7.0
github.com/spf13/afero v1.4.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
Expand Down
54 changes: 1 addition & 53 deletions go.sum

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions src/apiutil/apiutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package apiutil

import (
import (
commonsV1 "github.com/nlnwa/veidemann-api/go/commons/v1"
configV1 "github.com/nlnwa/veidemann-api/go/config/v1"
frontierV1 "github.com/nlnwa/veidemann-api/go/frontier/v1"
Expand Down Expand Up @@ -177,12 +177,15 @@ func TestCreateTemplateFilter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotMask, gotTemplate, err := CreateTemplateFilter(tt.args.filterString, tt.args.templateObj)
gotMask := new(commonsV1.FieldMask)
gotTemplate := tt.args.templateObj

err := CreateTemplateFilter(tt.args.filterString, gotTemplate, gotMask)
if (err != nil) != tt.wantErr {
t.Errorf("CreateTemplateFilter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !proto.Equal(gotMask, tt.wantMask) {
if tt.wantMask != nil && !proto.Equal(gotMask, tt.wantMask) {
t.Errorf("CreateTemplateFilter() gotMask = %v, wantMask %v", gotMask, tt.wantMask)
}
if !proto.Equal(gotTemplate, tt.wantTemplate) {
Expand Down
133 changes: 82 additions & 51 deletions src/cmd/reports/crawlexecution.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/nlnwa/veidemannctl/src/connection"
"github.com/nlnwa/veidemannctl/src/format"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/protobuf/types/known/timestamppb"
"io"
"time"
)

var crawlExecFlags struct {
Expand All @@ -36,91 +39,119 @@ var crawlExecFlags struct {
file string
orderByPath string
orderDesc bool
to *time.Time
from *time.Time
watch bool
states []string
}

// crawlexecutionCmd represents the crawlexecution command
var crawlexecutionCmd = &cobra.Command{
Use: "crawlexecution",
Short: "Get current status for crawl executions",
Long: `Get current status for crawl executions.`,
RunE: func(cmd *cobra.Command, args []string) error {
client, conn := connection.NewReportClient()
defer conn.Close()

var ids []string
func newCrawlExecutionCmd() *cobra.Command {
var crawlexecutionCmd = &cobra.Command{
Use: "crawlexecution",
Short: "Get current status for crawl executions",
Long: `Get current status for crawl executions.`,
PreRunE: func(cmd *cobra.Command, args []string) error {
v := viper.New()

if err := v.BindPFlag("to", cmd.Flag("to")); err != nil {
return fmt.Errorf("failed to bind flag: %w", err)
} else if v.IsSet("to") {
to := v.GetTime("to")
crawlExecFlags.to = &to
}
if err := v.BindPFlag("from", cmd.Flag("from")); err != nil {
return fmt.Errorf("failed to bind flag: %w", err)
} else if v.IsSet("from") {
from := v.GetTime("from")
crawlExecFlags.from = &from
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
client, conn := connection.NewReportClient()
defer conn.Close()

if len(args) > 0 {
ids = args
}
var ids []string

request, err := CreateCrawlExecutionsListRequest(ids)
if err != nil {
return fmt.Errorf("failed creating request: %w", err)
}
if len(args) > 0 {
ids = args
}

cmd.SilenceUsage = true
request, err := createCrawlExecutionsListRequest(ids)
if err != nil {
return fmt.Errorf("failed creating request: %w", err)
}

r, err := client.ListExecutions(context.Background(), request)
if err != nil {
return fmt.Errorf("error from controller: %w", err)
}
out, err := format.ResolveWriter(crawlExecFlags.file)
if err != nil {
return fmt.Errorf("could not resolve output '%s': %w", crawlExecFlags.file, err)
}
s, err := format.NewFormatter("CrawlExecutionStatus", out, crawlExecFlags.format, crawlExecFlags.goTemplate)
if err != nil {
return err
}
defer s.Close()
cmd.SilenceUsage = true

for {
msg, err := r.Recv()
if err == io.EOF {
break
r, err := client.ListExecutions(context.Background(), request)
if err != nil {
return fmt.Errorf("error from controller: %w", err)
}
out, err := format.ResolveWriter(crawlExecFlags.file)
if err != nil {
return err
return fmt.Errorf("could not resolve output '%s': %w", crawlExecFlags.file, err)
}
if err := s.WriteRecord(msg); err != nil {
s, err := format.NewFormatter("CrawlExecutionStatus", out, crawlExecFlags.format, crawlExecFlags.goTemplate)
if err != nil {
return err
}
}
return nil
},
}

func init() {
defer s.Close()

for {
msg, err := r.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if err := s.WriteRecord(msg); err != nil {
return err
}
}
return nil
},
}
crawlexecutionCmd.Flags().Int32VarP(&crawlExecFlags.pageSize, "pagesize", "s", 10, "Number of objects to get")
crawlexecutionCmd.Flags().Int32VarP(&crawlExecFlags.page, "page", "p", 0, "The page number")
crawlexecutionCmd.Flags().StringVarP(&crawlExecFlags.format, "output", "o", "table", "Output format (table|wide|json|yaml|template|template-file)")
crawlexecutionCmd.Flags().StringVarP(&crawlExecFlags.goTemplate, "template", "t", "", "A Go template used to format the output")
crawlexecutionCmd.Flags().StringSliceVarP(&crawlExecFlags.filters, "filter", "q", nil, "Filter objects by field (i.e. meta.description=foo")
crawlexecutionCmd.Flags().StringSliceVar(&crawlExecFlags.states, "state", nil, "Filter objects by state(s)")
crawlexecutionCmd.Flags().StringSliceVar(&crawlExecFlags.states, "state", nil, "Filter objects by state. Valid states are UNDEFINED, FETCHING, SLEEPING, FINISHED or FAILED")
crawlexecutionCmd.Flags().StringVarP(&crawlExecFlags.file, "filename", "f", "", "File name to write to")
crawlexecutionCmd.Flags().StringVar(&crawlExecFlags.orderByPath, "order-by", "", "Order by path")
crawlexecutionCmd.Flags().String("to", "", "To start time")
crawlexecutionCmd.Flags().String("from", "", "From start time")
crawlexecutionCmd.Flags().BoolVar(&crawlExecFlags.orderDesc, "desc", false, "Order descending")
crawlexecutionCmd.Flags().BoolVarP(&crawlExecFlags.watch, "watch", "w", false, "Get a continous stream of changes")

ReportCmd.AddCommand(crawlexecutionCmd)
return crawlexecutionCmd
}

func CreateCrawlExecutionsListRequest(ids []string) (*reportV1.CrawlExecutionsListRequest, error) {
func createCrawlExecutionsListRequest(ids []string) (*reportV1.CrawlExecutionsListRequest, error) {
request := &reportV1.CrawlExecutionsListRequest{
Id: ids,
Watch: crawlExecFlags.watch,
PageSize: crawlExecFlags.pageSize,
Offset: crawlExecFlags.page,
OrderByPath: crawlExecFlags.orderByPath,
Id: ids,
Watch: crawlExecFlags.watch,
PageSize: crawlExecFlags.pageSize,
Offset: crawlExecFlags.page,
OrderByPath: crawlExecFlags.orderByPath,
OrderDescending: crawlExecFlags.orderDesc,
}

if crawlExecFlags.watch {
request.PageSize = 0
}

if crawlExecFlags.from != nil {
fmt.Println(crawlExecFlags.from)
request.StartTimeFrom = timestamppb.New(*crawlExecFlags.from)
}

if crawlExecFlags.to != nil {
request.StartTimeTo = timestamppb.New(*crawlExecFlags.to)
}

if len(crawlExecFlags.states) > 0 {
for _, state := range crawlExecFlags.states {
if s, ok := frontierV1.CrawlExecutionStatus_State_value[state]; !ok {
Expand Down
45 changes: 22 additions & 23 deletions src/cmd/reports/crawllog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reports

import (
"context"
"fmt"
commonsV1 "github.com/nlnwa/veidemann-api/go/commons/v1"
logV1 "github.com/nlnwa/veidemann-api/go/log/v1"
"github.com/nlnwa/veidemannctl/src/apiutil"
Expand All @@ -24,7 +25,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"io"
"os"
)

var crawllogFlags struct {
Expand All @@ -38,12 +38,22 @@ var crawllogFlags struct {
watch bool
}

func init() {
crawllogCmd.Flags().Int32VarP(&crawllogFlags.pageSize, "pagesize", "s", 10, "Number of objects to get")
crawllogCmd.Flags().Int32VarP(&crawllogFlags.page, "page", "p", 0, "The page number")
crawllogCmd.Flags().StringVarP(&crawllogFlags.format, "output", "o", "table", "Output format (table|wide|json|yaml|template|template-file)")
crawllogCmd.Flags().StringVarP(&crawllogFlags.goTemplate, "template", "t", "", "A Go template used to format the output")
crawllogCmd.Flags().StringVarP(&crawllogFlags.filter, "filter", "q", "", "Filter objects by field (i.e. meta.description=foo")
crawllogCmd.Flags().StringVarP(&crawllogFlags.file, "filename", "f", "", "File name to write to")
crawllogCmd.Flags().BoolVarP(&crawllogFlags.watch, "watch", "w", false, "Get a continous stream of changes")
}

// crawllogCmd represents the crawllog command
var crawllogCmd = &cobra.Command{
Use: "crawllog",
Short: "View crawl log",
Long: `View crawl log.`,
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
client, conn := connection.NewLogClient()
defer conn.Close()

Expand All @@ -53,23 +63,23 @@ var crawllogCmd = &cobra.Command{
ids = args
}

request, err := CreateCrawlLogListRequest(ids)
request, err := createCrawlLogListRequest(ids)
if err != nil {
log.Fatalf("Error creating request: %v", err)
return fmt.Errorf("error creating request: %w", err)
}

r, err := client.ListCrawlLogs(context.Background(), request)
if err != nil {
log.Fatalf("Error from controller: %v", err)
return fmt.Errorf("error from controller: %w", err)
}

out, err := format.ResolveWriter(crawllogFlags.file)
if err != nil {
log.Fatalf("Could not resolve output '%v': %v", crawllogFlags.file, err)
return fmt.Errorf("could not resolve output '%s': %w", crawllogFlags.file, err)
}
s, err := format.NewFormatter("CrawlLog", out, crawllogFlags.format, crawllogFlags.goTemplate)
if err != nil {
log.Fatal(err)
return err
}
defer s.Close()

Expand All @@ -79,29 +89,18 @@ var crawllogCmd = &cobra.Command{
break
}
if err != nil {
log.Fatalf("Error getting object: %v", err)
return err
}
log.Debugf("Outputting crawl log record with WARC id '%s'", msg.WarcId)
if s.WriteRecord(msg) != nil {
os.Exit(1)
if err := s.WriteRecord(msg); err != nil {
return err
}
}
return nil
},
}

func init() {
crawllogCmd.Flags().Int32VarP(&crawllogFlags.pageSize, "pagesize", "s", 10, "Number of objects to get")
crawllogCmd.Flags().Int32VarP(&crawllogFlags.page, "page", "p", 0, "The page number")
crawllogCmd.Flags().StringVarP(&crawllogFlags.format, "output", "o", "table", "Output format (table|wide|json|yaml|template|template-file)")
crawllogCmd.Flags().StringVarP(&crawllogFlags.goTemplate, "template", "t", "", "A Go template used to format the output")
crawllogCmd.Flags().StringVarP(&crawllogFlags.filter, "filter", "q", "", "Filter objects by field (i.e. meta.description=foo")
crawllogCmd.Flags().StringVarP(&crawllogFlags.file, "filename", "f", "", "File name to write to")
crawllogCmd.Flags().BoolVarP(&crawllogFlags.watch, "watch", "w", false, "Get a continous stream of changes")

ReportCmd.AddCommand(crawllogCmd)
}

func CreateCrawlLogListRequest(ids []string) (*logV1.CrawlLogListRequest, error) {
func createCrawlLogListRequest(ids []string) (*logV1.CrawlLogListRequest, error) {
request := &logV1.CrawlLogListRequest{}
request.WarcId = ids
request.Watch = crawllogFlags.watch
Expand Down
Loading

0 comments on commit e5dd1c2

Please sign in to comment.