-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexpect.py
199 lines (168 loc) · 6.99 KB
/
expect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import pdb
import re
from beancount.core import data
from dateutil.rrule import rrule, FREQNAMES
import datetime as dt
from itertools import groupby
from beancount.core.number import D
from beancount.core.amount import Amount
from decimal import ROUND_HALF_UP
__plugins__ = ['expect']
multiplier = {
'YEARLY': 1,
'MONTHLY': 12,
'WEEKLY': 56,
'DAILY': 365.25,
}
def _is_origin(entry):
"""Return if entry is an origin for an expectation.
:param entry: Entry
:returns: Boolean
"""
assert isinstance(entry, data.Transaction)
v = True
v = v and entry.meta.get('expected')
v = v and entry.meta.get('expected').lower() in ["true", "t"]
return v
def get_expected_dates(entry):
"""Get expected dates based on entry parameters.
:param entry: Entry
:returns: list of expected dates
Reads entry metadata:
- frequency :: must be in the RFC 5545 names
(https://datatracker.ietf.org/doc/html/rfc5545),
otherwise default to monthly
- interval :: interval between expectations
- until OR count OR duration_in_years :: either end date, or
number of expected occurrences. The two meta MUST NOT be given
both in the same entry
(https://dateutil.readthedocs.io/en/stable/rrule.html#dateutil.rrule.rrule)
"""
freq_param = entry.meta.get('frequency', 'monthly').upper()
freq = FREQNAMES.index(freq_param)
interval = entry.meta.get('interval', 1)
duration = entry.meta.get('duration_in_years', 1)
_count = multiplier.get(freq_param) * duration / interval
count = entry.meta.get('count', _count)
if entry.meta.get('until'):
until = dt.date.fromisoformat(entry.meta.get('until'))
else:
until = None
expected_dates = [d.date() for d in rrule(freq=freq,
dtstart=entry.date,
count=count,
interval=interval,
until=until)]
return expected_dates
def all_equal(iterable):
"""Return if all members of iterable are equal.
:param iterable: An iterable
:returns: Boolean
https://stackoverflow.com/a/3844832/2265140
"""
g = groupby(iterable)
return next(g, True) and not next(g, False)
def _have_same_postings_accounts(entries: list) -> bool:
"""Return if all entries provided have the same postings accounts.
:param entries: List of entries
:returns: Boolean
"""
postings = []
for e in entries:
postings.append(set([p.account for p in e.postings]))
if all_equal(postings):
return True
return False
def _is_overtrown_by_real_entry(exp_entry,
entries,
margin: int = 7):
"""Return if entry is overtrown by any existing entry.
:param exp_entry: Entry to be checked
:param entries: Existing entries
:param margin: Margin in days
:returns: Boolean
Return True if any entry is either future, or older but within a
margin in days. This to exclude expected dates obsolete because
actually duplicates of existing entries either occurred in the future
or in the immediate past.
"""
for real_entry in entries:
if _have_same_postings_accounts([exp_entry, real_entry]):
margin_days = dt.timedelta(days=margin)
if real_entry.date >= exp_entry.date - margin_days:
return True
return False
def create_expected(entry, exp_date):
# create entry with expected date
new_entry = entry._replace(date=exp_date)
# use amount in meta, if available
if new_entry.meta.get('amount'):
amount_str = new_entry.meta.get('amount')
val = D(re.search(r"\d+[\.\d+]*", amount_str).group(0))
# round using Decimal method `quantize'
rounded_val = val.quantize(D('.01'), rounding=ROUND_HALF_UP)
new_amount = Amount(rounded_val, 'EUR')
new_other_amount = Amount(rounded_val * -1, 'EUR')
new_postings = []
assert len(new_entry.postings) == 2
for posting in new_entry.postings:
# check which posting is same positive or negative as new
if posting.units.number * new_amount.number > 0:
# replace the same sign one with new amount
new_postings.append(posting._replace(units=new_amount))
else:
# remove units from the other posting
new_postings.append(posting._replace(units=new_other_amount))
new_entry = new_entry._replace(postings=new_postings)
# pdb.set_trace()
# add tag
new_tagset = new_entry.tags.union(set(['expected']))
new_entry = new_entry._replace(tags=new_tagset)
# remove metadata
new_meta = {'filename': new_entry.meta.get('filename'),
'lineno': new_entry.meta.get('lineno')}
new_entry = new_entry._replace(meta=new_meta)
return new_entry
def expect(entries, options_map, config_string="{}"):
"""Create expected entries based on entry metadata.
:param entries: Entries
:param options_map: Unused option map
:param config_string: Unused configuration string
:returns: Existing entries and expected entries, and errors.
"""
errors = []
forecasted = []
# save all non- ~data.Transactions~
non_txns = [e for e in entries if not isinstance(e, data.Transaction)]
# process, from now on, only ~data.Transactions~
txns = [e for e in entries if isinstance(e, data.Transaction)]
# save all txns that are not origins
non_origin_txns = [txn for txn in txns if not _is_origin(txn)]
# iterate transactions that generate expectations ("origins")
origin_txns = [txn for txn in txns if _is_origin(txn)]
# list for the origins with the amount replaced by meta, if any
processed_origin_txns = []
for txn in origin_txns:
# replace origin txns with ones containing the amount in meta, if any
processed_origin_txn = create_expected(txn, txn.date)
if _is_overtrown_by_real_entry(processed_origin_txn, txns):
# do nothing, since also the original txn is overtrown
pass
else:
# append the processed origin to the txn list
processed_origin_txns.append(processed_origin_txn)
# get dates for expected transactions
expectations = get_expected_dates(txn)
# iterate the expected dates
for expected_date in expectations:
# copy the entry, using the expected date and tag with 'expected'
expected_entry = create_expected(txn, expected_date)
# check if the new entry is a duplicate of an existing one
# e.g., there is real transaction overwriting the expected
if _is_overtrown_by_real_entry(expected_entry, txns):
continue
else:
forecasted.append(expected_entry)
processed_entries = non_txns + processed_origin_txns + \
non_origin_txns + forecasted
return processed_entries, errors