Skip to content

Commit

Permalink
refactor exec.Start() and don't provide context for callback
Browse files Browse the repository at this point in the history
make exec.Start() functions synchronous
callback functions should not be killed by context
  • Loading branch information
catatsuy committed Feb 20, 2021
1 parent 4abe6fd commit eeb5dfe
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.15.x' ]
go: [ '1.15.x', '1.16.x' ]
steps:

- name: Set up Go
Expand Down
18 changes: 11 additions & 7 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (c *CLI) Run(args []string) int {

ex := throttle.NewExec(copyStdin)

exitC := make(chan os.Signal, 1)
signal.Notify(exitC, syscall.SIGTERM, syscall.SIGINT)
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)

channel := c.conf.PrimaryChannel
if channel == "" {
Expand All @@ -189,31 +189,35 @@ func (c *CLI) Run(args []string) int {
IconEmoji: c.conf.IconEmoji,
}

flushCallback := func(_ context.Context, output string) error {
flushCallback := func(output string) error {
param.Text = output
return c.sClient.PostText(context.Background(), param)
}

done := make(chan struct{})

doneCallback := func(ctx context.Context, output string) error {
doneCallback := func(output string) error {
defer func() {
done <- struct{}{}
}()

return flushCallback(ctx, output)
return flushCallback(output)
}

ticker := time.NewTicker(c.conf.Duration)
defer ticker.Stop()

ctx, cancel := context.WithCancel(context.Background())

ex.Start(ctx, ticker.C, flushCallback, doneCallback)
exitC := make(chan struct{})
go func() {
ex.Start(ctx, ticker.C, flushCallback, doneCallback)
close(exitC)
}()

select {
case <-sigC:
case <-exitC:
case <-ex.Wait():
}
cancel()

Expand Down
27 changes: 15 additions & 12 deletions throttle/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (ex *Exec) stringAndReset() string {
return ex.writer.String()
}

func (ex *Exec) Start(ctx context.Context, interval <-chan time.Time, flushCallback func(ctx context.Context, output string) error, doneCallback func(ctx context.Context, output string) error) {
func (ex *Exec) Start(ctx context.Context, interval <-chan time.Time, flushCallback func(output string) error, doneCallback func(output string) error) {
go func() {
for {
line, _, err := ex.reader.ReadLine()
Expand All @@ -71,20 +71,23 @@ func (ex *Exec) Start(ctx context.Context, interval <-chan time.Time, flushCallb
panic(err)
}
}
ex.exitC <- struct{}{}
// if notify_slack receives EOF, this function will exit.
close(ex.exitC)
}()

go func() {
for {
select {
case <-interval:
flushCallback(ctx, ex.flush())
case <-ctx.Done():
doneCallback(ctx, ex.flush())
return
}
L:
for {
select {
case <-interval:
flushCallback(ex.flush())
case <-ctx.Done():
doneCallback(ex.flush())
break L
case <-ex.Wait():
doneCallback(ex.flush())
break L
}
}()
}
}

func (ex *Exec) Wait() <-chan struct{} {
Expand Down
14 changes: 9 additions & 5 deletions throttle/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"
)

func TestRun(t *testing.T) {
func TestRun_pipeClose(t *testing.T) {
pr, pw := io.Pipe()

output := new(bytes.Buffer)
Expand All @@ -20,7 +20,7 @@ func TestRun(t *testing.T) {
count := 0
fc := make(chan struct{})

flushCallback := func(_ context.Context, s string) error {
flushCallback := func(s string) error {
defer func() {
fc <- struct{}{}
// to random fail from Go 1.12 or later
Expand All @@ -36,7 +36,7 @@ func TestRun(t *testing.T) {

doneCount := 0

doneCallback := func(_ context.Context, s string) error {
doneCallback := func(s string) error {
defer func() {
fc <- struct{}{}
}()
Expand All @@ -48,7 +48,11 @@ func TestRun(t *testing.T) {
return nil
}

ex.Start(ctx, testC, flushCallback, doneCallback)
exitC := make(chan struct{})
go func() {
ex.Start(ctx, testC, flushCallback, doneCallback)
close(exitC)
}()

testC <- time.Time{}
<-fc
Expand Down Expand Up @@ -82,7 +86,7 @@ func TestRun(t *testing.T) {

// do not panic
pw.Close()
<-ex.Wait()
<-exitC

cancel()
<-fc
Expand Down

0 comments on commit eeb5dfe

Please sign in to comment.