-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrecordscan.py
206 lines (174 loc) · 7.64 KB
/
recordscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#
# Scan a MARC file or database table and check for records that match specific tests
# Check for possible duplicate author names
#
# Usage: python recordscan.py --inputfile <MARC input file>
# or: python recordscan.py --inputtable <database table>
#
# The database table should have columns for bibnumber, tag, indicators, and tagData.
# tagData is all the subfields glommed together. You can get more information from
# the mydb.py file.
#
# Version: 0.1.0 1/1/21
#
# License: CC BY-NC-SA 4.0, https://creativecommons.org/licenses/by-nc-sa/4.0/
#
# Graeme Williams
#
from collections import Counter
import re
import argparse
from pymarc import Record, Field
from typing import Callable, Set, List, Tuple # just used for type hints
from lib import mymarc
# Globals!
recordCounter : Counter = Counter()
authorSet : Set[str] = set()
# Utility functions
def to_string_simple(the_record : Record) -> str:
# This is just for identifying faulty records in the output.
# You can use whatever MARC fields you want here
part1 = the_record['001'].data if the_record['001'] else "No 001"
part2 = the_record['245']['a'][:60] if the_record['245'] else "No 245"
return part1 + '/' + part2
# *** Next four functions are concerned with harvesting author data
# *** from the $a and $d subfields of 100 and 700 fields so we
# *** can check for possible duplicates (e.g., Smith, Bob ~ Smith, Bob, 1972-)
# for each 100 and 700 field the $a and $d fields are jammed together; we split them later
def collect_authors(arecord : Record):
afield : Field = arecord['100']
if afield and afield['a']:
aname = afield['a'].rstrip(",.") + ("#" + afield['d'].rstrip(",.") if afield['d'] else "")
authorSet.add(aname)
for f in arecord.get_fields('700'):
if f['a']:
aname = f['a'].rstrip(",.") + ("#" + f['d'].rstrip(",.") if f['d'] else "")
authorSet.add(aname)
# split author string into name, birth date, death date, with missing values == None
rgx = re.compile(r"(.*)(#\d{4}-)(\d{4}\.)?$")
def author_split(author_string : str) -> Tuple:
m = rgx.match(author_string)
if not m:
return author_string, None, None
return m[1], m[2], m[3] if m.lastindex == 3 else None
# When you're comparing two (name, birth date, death date) tuples, missing values match
# anything, so only two non-None values can produce a False result
def author_equals(x : str, y : str) -> bool:
for ax, ay in zip(author_split(x.casefold()), author_split(y.casefold())):
# ax and ay must be non-None to produce a false result
if ax and ay and ax != ay:
return False
return True
# check for duplicate authors in global 'authorSet'
# authorSet is sorted so possible duplicate pairs are consecutive
def check_for_duplicate_authors() -> List[str]:
result: List[str] = []
prev = "+++++"
for author in sorted(authorSet):
if author_equals(author, prev):
result.append(prev + " ~ " + author)
prev = author
return result
# *** Predicates that are too complicated to put into a lambda go here ***
def no1xx7xx(the_record : Record) -> bool:
return not any([the_record[f] for f in ['100', '110', '700', '710','711', '720', '730']])
# return true if this record has a 245c with "and others" but no 1xx/7xx
# yes, I know this is an odd test; it was an experiment
def check245c1xx7xx(the_record : Record) -> bool:
s = the_record['245']['c']
return s and ("and others" in s) and no1xx7xx(theRecord)
# return true if this record has a 6xx field with indicator 2 = 7 but no $2 subfield
def indicator7butnodollar2(the_record : Record) -> bool:
return any([f.indicator2=='7' and not f.get_subfields('2') for f in the_record.subjects()])
# Return true if this record has duplicate 650 subject headings,
# ignoring the trailing dot, if any. The most common use case is
# to detect duplicates if you use both LCSH and FAST
def duplicate_subjects(the_record : Record) -> bool:
# use only these fields in making the comparison
subfields650 = ('a', 'b', 'c', 'd', 'v', 'x', 'y', 'z')
field_list = []
subject_fields = the_record.get_fields('650')
if not subject_fields:
return False
# turn each 650 field into a dict and add it to field_list
for f in subject_fields:
field_dict = { k : f[k].rstrip('.') for k in subfields650 if f[k]}
field_list.append(field_dict)
# check the list for duplicate dicts
for i in range(0, len(field_list)):
if field_list[i] in field_list[i+1:]:
return True
return False
# turn a predicate into a function to check the record
# and count matching records, with optional print function
def checkfactory(label: str, predicate: Callable, print_this: Callable = None) -> Callable:
# this function is returned by checkFactory, with bound values of label, predicate and print_this
# note that 'label' is being used both as the Counter key and to label output
def check(record):
if predicate(record):
recordCounter[label] += 1
if print_this:
print(label, ': ', print_this(record), sep='')
recordCounter[label] = 0
return check
# Construct list of checks from predicates.
# This list is a list of *functions*
checkList = (
# using inline lambda functions
# checkfactory('100', lambda r: r['100']),
checkfactory('no 001', lambda r: not r['001']),
checkfactory('no 006', lambda r : not r['006']),
checkfactory('no 100', lambda r: not r['100']
, to_string_simple
),
checkfactory('245', lambda r: r['245']),
# checkfactory('no 245', lambda r: not r['245']),
checkfactory('no 245c', lambda r: not r['245']['c']
, to_string_simple
),
checkfactory('old style 041',
lambda r: r['041'] and r['041']['a'] and len(r['041']['a']) > 3
# , lambda r : r['041']
),
checkfactory('041 but no 546', lambda r: r['041'] and not r['546']
, to_string_simple
),
checkfactory('041h w. wrong indicator',
lambda r : r['041'] and r['041']['h'] and r['041'].indicators[0] != '1'
# , lambda r : r['041']
),
# checkfactory('999', lambda r: r['999']),
# using predicate functions defined above
checkfactory('no 1xx or 7xx', no1xx7xx
, to_string_simple
),
checkfactory('"and others" in 245c but no 1xx/7xx', check245c1xx7xx),
checkfactory('indicator 7 but no $2 in 6xx', indicator7butnodollar2
, to_string_simple
),
checkfactory('duplicate subject headings', duplicate_subjects
, to_string_simple
)
)
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--inputfile", "-if")
group.add_argument("--inputtable", "-it")
args = parser.parse_args()
# For each MARC record in the file or database table:
# - collect authors from 100, 700;
# - run each check in the checkList
for bibnum, theRecord in mymarc.recordgenerator(args.inputfile, args.inputtable):
collect_authors(theRecord)
for c in checkList:
c(theRecord)
# print number of records found for each check.
# k is the label originally passed to checkfactory for each check
print("\nSummary of record found in different categories")
for k, n in recordCounter.items():
print(n, "records have", k)
# print duplicate authors (e.g., Smith, Bob ~ Smith, Bob, 1972-)
print("\nPossible duplicate NARs")
print(*check_for_duplicate_authors(), sep="\n")
# print(sorted(authorSet))