-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathanalyze_phrase.py
163 lines (128 loc) · 5.02 KB
/
analyze_phrase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from talon import actions, speech_system, registry
from talon.grammar import Phrase, Capture
from talon.grammar.vm import VMListCapture, VMCapture
from talon.engines.w2l import DecodeWord, WordMeta
from talon.scripting.types import CommandImpl
from talon_init import TALON_HOME
from typing import Optional, Any
import os
import re
from .types import AnalyzedPhrase, AnalyzedCommand, AnalyzedCapture, AnalyzedWord
from .calc_command_actions import calc_command_actions
SIM_RE = re.compile(r"""(?:\[\d+] "[^"]+"\s+path: ([^\n]+)\s+rule: "([^"]+))+""")
def analyze_phrase(phrase: Phrase) -> AnalyzedPhrase:
"""Analyze spoken phrase"""
phrase_text = get_phrase(phrase)
return AnalyzedPhrase(
phrase_text,
get_words(phrase),
get_metadata(phrase),
get_commands(phrase, phrase_text),
)
def get_phrase(phrase: Phrase) -> str:
return " ".join(phrase["phrase"])
def get_words(phrase: Phrase) -> list[AnalyzedWord]:
words = phrase["phrase"]
return [
AnalyzedWord(
str(word),
getattr(word, "start", None),
getattr(word, "end", None),
)
for word in words
]
def get_metadata(phrase: Phrase) -> Optional[dict]:
meta = phrase.get("_metadata")
if meta:
# We have already captured the phrase and don't need a duplication.
return {k: v for k, v in meta.items() if k not in {"emit", "decode"}}
return None
def get_commands(phrase: Phrase, phrase_text: str) -> list[AnalyzedCommand]:
captures = phrase["parsed"]
commands = get_commands_impl(captures, phrase_text)
return [
get_command(command, capture) for command, capture in zip(commands, captures)
]
def get_command(command: CommandImpl, capture: Capture) -> AnalyzedCommand:
code = command.script.code
capture_mapping = get_capture_mapping(capture)
return AnalyzedCommand(
" ".join(capture._unmapped),
command.rule.rule,
code,
get_path(command.script.filename),
command.script.start_line,
get_captures(capture),
capture_mapping,
calc_command_actions(code, capture_mapping),
)
def get_commands_impl(captures: list[Capture], phrase_text: str) -> list[CommandImpl]:
commands = try_get_last_commands(captures)
if commands:
return commands
commands = get_commands_from_sim(phrase_text)
if len(captures) != len(commands):
raise Exception(
f"Got incorrect number of commands({len(commands)}) for the list of captures({len(captures)})"
)
return commands
def try_get_last_commands(captures: list[Capture]) -> Optional[list[CommandImpl]]:
"""
Returns last command implementation if its captures matches the given list.
Repeat commands are missing from this list and then we can't use the list of last commands.
"""
recent_commands = actions.core.recent_commands()
if not recent_commands:
return None
last_commands = recent_commands[-1]
if len(captures) != len(last_commands):
return None
for c1, (_, c2) in zip(captures, last_commands):
if c1 != c2:
return None
return [cmd for cmd, _ in last_commands]
def get_commands_from_sim(phrase_text: str) -> list[CommandImpl]:
try:
raw_sim = speech_system._sim(phrase_text)
except Exception as ex:
raise Exception(f"Failed to run sim on phrase '{phrase_text}'. Ex: {ex}")
matches = SIM_RE.findall(raw_sim)
if not matches:
raise Exception(f"Can't parse sim '{raw_sim}'")
return [get_command_from_path(path, rule) for path, rule in matches]
def get_command_from_path(path: str, rule: str) -> CommandImpl:
context_name = path.replace(os.path.sep, ".")
if not context_name in registry.contexts:
raise Exception(f"Can't find context for path '{path}'")
context = registry.contexts[context_name]
commands = [c for c in context.commands.values() if c.rule.rule == rule]
if len(commands) != 1:
raise Exception(
f"Expected 1 command. Found {len(commands)} for rule '{rule}' in path '{path}'"
)
return commands[0]
def get_path(filename: str) -> str:
return os.path.relpath(filename, TALON_HOME)
def get_captures(capture: Capture) -> list[AnalyzedCapture]:
captures = []
for i, value in enumerate(capture):
c = capture._capture[i]
if isinstance(c, DecodeWord) or isinstance(c, WordMeta):
phrase = c.word
name = None
elif isinstance(c, VMCapture):
phrase = " ".join(c.unmapped())
name = c.name
elif isinstance(c, VMListCapture):
phrase = c.value
name = c.name
else:
raise Exception(f"Unknown capture type '{type(c)}'")
captures.append(AnalyzedCapture(phrase, value, name))
return captures
def get_capture_mapping(capture: Capture) -> dict[str, Any]:
mapping = {}
for k, v in capture._mapping.items():
if not k.endswith("_list") and not "." in k:
mapping[k] = v
return mapping