Skip to content

Commit

Permalink
[fix] 終了処理まわりを修正
Browse files Browse the repository at this point in the history
  • Loading branch information
shinosaki committed Feb 19, 2025
1 parent a01ee9e commit 3d58f39
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 22 deletions.
21 changes: 17 additions & 4 deletions internal/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,25 @@ func Alert(

log.Println("Alert: detected live stream for user id", userId, userName)
go func() {
sc.AddTask(program.Id, func() {})
var proc *exec.Cmd

sc.AddTask(program.Id, func() {
// CancelTaskされた場合
// exec.CommandしたRecorderにSIGINTを送信する
if proc != nil {
log.Println("Alert: send interrupt to recorder", program.Id)
if err := proc.Process.Signal(os.Interrupt); err != nil {
log.Println("Alert: send interrupt to recorder", program.Id, err)
}
if err := proc.Wait(); err != nil {
log.Println("Alert: terminated recorder process", program.Id, err)
}
}
})
defer sc.CancelTask(program.Id)

// nico.Client(program.Id, client, sc)
configPath := sc.GetValue(consts.CONFIG_PATH).(string)
proc := exec.Command(os.Args[0], "recorder", program.Id, "--config", configPath)
proc = exec.Command(os.Args[0], "recorder", program.Id, "--config", configPath)
utils.SetSID(proc)

if err := proc.Run(); err != nil {
Expand Down Expand Up @@ -78,7 +91,7 @@ func Alert(
select {
case <-sc.Context().Done():
log.Println("Alert: receive interrupt...")
// sc.Wait()
sc.CancelAllTasks()
return
case <-ticker.C:
fetchPrograms(false)
Expand Down
14 changes: 14 additions & 0 deletions internal/utils/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ContextKey string
type SignalContext struct {
wg *sync.WaitGroup
tasks *sync.Map
once sync.Once
ctx context.Context
cancel context.CancelFunc
}
Expand All @@ -23,6 +24,7 @@ func NewSignalContext() *SignalContext {
sc := &SignalContext{
wg: &sync.WaitGroup{},
tasks: &sync.Map{},
once: sync.Once{},
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -63,6 +65,18 @@ func (c *SignalContext) CancelTask(id any) {
}
}

func (c *SignalContext) CancelAllTasks() {
c.once.Do(func() {
c.tasks.Range(func(key, value any) bool {
canceler := value.(context.CancelFunc)
canceler()
c.tasks.Delete(key)
c.wg.Done()
return true
})
})
}

func (c *SignalContext) GetValue(key ContextKey) any {
return c.ctx.Value(key)
}
Expand Down
22 changes: 17 additions & 5 deletions internal/utils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ import (
)

func ExecCommand(command []string, sc *SignalContext) {
var proc *exec.Cmd

commandString := strings.Join(command, " ")
sc.AddTask(commandString, func() {})
sc.AddTask(commandString, func() {
if proc != nil {
if err := proc.Process.Signal(os.Interrupt); err != nil {
log.Println("Exec Command: send interrupt to process", err)
}
if err := proc.Wait(); err != nil {
log.Println("Exec Command: terminated process", err)
}
}
})
log.Println("Exec Command:", commandString)

go func() {
defer sc.CancelTask(commandString)

proc := exec.Command(command[0], command[1:]...)
proc = exec.Command(command[0], command[1:]...)

if err := proc.Start(); err != nil {
log.Println("Exec Command: failed to start", err)
Expand All @@ -25,9 +36,10 @@ func ExecCommand(command []string, sc *SignalContext) {
go func() {
<-sc.Context().Done()
log.Println("Exec Command: receive interrupt, send interrupt to", command[0])
if err := proc.Process.Signal(os.Interrupt); err != nil {
log.Printf("Exec Command: %s's interrupt message, %v", command[0], err)
}
// if err := proc.Process.Signal(os.Interrupt); err != nil {
// log.Printf("Exec Command: %s's interrupt message, %v", command[0], err)
// }
sc.CancelTask(commandString)
}()

if err := proc.Wait(); err != nil {
Expand Down
18 changes: 15 additions & 3 deletions internal/utils/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,28 @@ func SaveToFile[T any](data []T, path string) error {
}
}

data, err := json.Marshal(append(existsData, data...))
// marshalできないデータを弾く
var failedItems []T
for _, item := range data {
_, err := json.Marshal(item)
if err != nil {
log.Println("failed to marshal item:", err)
failedItems = append(failedItems, item)
continue
}
existsData = append(existsData, item)
}

result, err := json.Marshal(existsData)
if err != nil {
return err
}

file.Seek(0, 0) // seek to head
file.Truncate(0) // remove data

_, err = file.Write(data)
log.Printf("Write %d data to %s", len(data), path)
_, err = file.Write(result)
log.Printf("Write %d data to %s", len(existsData), path)

data = nil
return err
Expand Down
34 changes: 24 additions & 10 deletions pkg/nico/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func Client(
go websocketHandler(sc, ws, client, messageChan, data)

ws.Wait()

sc.CancelAllTasks()
sc.Wait()

return nil
Expand All @@ -57,7 +59,10 @@ func websocketHandler(
messageChan chan nicowebsocket.Message,
programData *ProgramData,
) {
sc.AddTask("websocket", func() {})
sc.AddTask("websocket", func() {
log.Println("NicoClient: close websocket...")
ws.Disconnect()
})
defer sc.CancelTask("websocket")

config, _ := sc.GetValue(consts.CONFIG).(*utils.Config)
Expand Down Expand Up @@ -87,7 +92,7 @@ func websocketHandler(
select {
case <-sc.Context().Done():
log.Println("NicoClient: receive interrupt...")
ws.Disconnect()
// ws.Disconnect()
return
// ws.doneの処理が必要かも
case message, ok := <-messageChan:
Expand Down Expand Up @@ -131,15 +136,23 @@ func commentHandler(
sc *utils.SignalContext,
client *http.Client,
) {
sc.AddTask(url+at, func() {})
defer sc.CancelTask(url + at)

var (
mu = &sync.Mutex{}
chatBuffer []*data.Chat
alreadySegments = make(map[string]struct{})
)

sc.AddTask(url+at, func() {
log.Println("CommentHandler: receive cancel")
mu.Lock()
if err := utils.SaveToFile(chatBuffer, outputPath); err != nil {
log.Println(err)
}
mu.Unlock()
log.Println("CommentHandler: cancel done")
})
defer sc.CancelTask(url + at)

// Periodic write chat data to file
go func() {
ticker := time.NewTicker(1 * time.Second)
Expand All @@ -148,11 +161,12 @@ func commentHandler(
for {
select {
case <-sc.Context().Done():
mu.Lock()
if err := utils.SaveToFile(chatBuffer, outputPath); err != nil {
log.Println(err)
}
mu.Unlock()
// mu.Lock()
// if err := utils.SaveToFile(chatBuffer, outputPath); err != nil {
// log.Println(err)
// }
// mu.Unlock()
sc.CancelTask(url + at)
return
case <-ticker.C:
mu.Lock()
Expand Down
13 changes: 13 additions & 0 deletions pkg/nico/nicowebsocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func NewClient() (*Client, chan Message) {
}
}

case SCHEDULE:
var data ScheduleData
if err := json.Unmarshal(message.Data, &data); err != nil {
log.Println("Nico Websocket: unmarshal schedule data failed", err)
return
}

// End が現在時刻もしくは以前なら終了処理
if data.End.Before(time.Now()) || data.End.Equal(time.Now()) {
log.Println("Nico Websocket: received program has finished")
c.Disconnect()
}

case RECONNECT:
var data ReconnectData
if err := json.Unmarshal(message.Data, &data); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/nico/nicowebsocket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
STREAM MessageType = "stream"
RECONNECT MessageType = "reconnect"
MESSAGE_SERVER MessageType = "messageServer"
SCHEDULE MessageType = "schedule"
)

type Request struct {
Expand Down Expand Up @@ -81,3 +82,8 @@ type MessageServerData struct {
ViewURI string `json:"viewUri"`
VposBaseTime time.Time `json:"vposBaseTime"`
}

type ScheduleData struct {
Begin time.Time `json:"begin"`
End time.Time `json:"end"`
}

0 comments on commit 3d58f39

Please sign in to comment.