-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzotero.py
356 lines (317 loc) · 13.6 KB
/
zotero.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
from __future__ import annotations
from typing import Optional, List, Dict, Any, Literal
import os
import json
import base64
import pathlib
import urllib
from pyzotero import zotero
from mcp.server.fastmcp import FastMCP, Context
from mcp.types import EmbeddedResource, BlobResourceContents
mcp = FastMCP("Zotero", dependencies=["pyzotero",
"mcp[cli]"])
class ZoteroWrapper(zotero.Zotero):
"""Wrapper for pyzotero client with error handling"""
def __init__(self):
try:
user_id = os.getenv('ZOTERO_USER_ID')
if user_id is None:
user_id = 0
super().__init__(1, 'user', '', local=True) #FIXME: Work around a bug #202 in pyzotero.
self.library_id = user_id
except Exception as e:
return json.dumps({
"error": "Failed to initialize Zotero connection.",
"message": str(e)
}, indent=2)
def format_creators(self, creators: List[Dict[str, str]]) -> str:
"""Format creator names into a string"""
names = []
for creator in creators:
name_parts = []
if creator.get('firstName'):
name_parts.append(creator['firstName'])
if creator.get('lastName'):
name_parts.append(creator['lastName'])
if name_parts:
names.append(' '.join(name_parts))
return ', '.join(names) or "No authors listed"
def format_item(self, item: Dict[str, Any], include_abstract: bool = True) -> Dict[str, Any]:
"""Format a Zotero item into a standardized dictionary"""
data = item.get('data', {})
formatted = {
'title': data.get('title', 'Untitled'),
'authors': self.format_creators(data.get('creators', [])),
'date': data.get('date', 'No date'),
'key': data.get('key'),
'itemType': data.get('itemType', 'Unknown type'),
}
if include_abstract:
formatted['abstractNote'] = data.get('abstractNote', 'No abstract available')
if 'DOI' in data:
formatted['doi'] = data['DOI']
if 'url' in data:
formatted['url'] = data['url']
if 'publicationTitle' in data:
formatted['publicationTitle'] = data['publicationTitle']
if 'tags' in data:
formatted['tags'] = [t.get('tag') for t in data.get('tags', []) if t.get('tag')]
return formatted
@zotero.retrieve
def file_url(self, item, **kwargs) -> str:
"""Get the file from a specific item"""
query_string = "/{t}/{u}/items/{i}/file/view/url".format(
u=self.library_id, t=self.library_type, i=item.upper()
)
return self._build_query(query_string, no_params=True)
@mcp.tool(description="List all collections in the local Zotero library.")
def get_collections(limit: Optional[int] = None, *, ctx: Context) -> str:
"""List all collections in the local Zotero library
Args:
limit: Optional how many items to return.
"""
try:
client = ZoteroWrapper()
collections = client.collections(limit=limit)
return json.dumps(collections, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Failed to fetch collections. Message: {str(e)}")
return json.dumps({
"error": f"Failed to fetch collections. Message: {str(e)}"
}, indent=2)
@mcp.tool(description="Gets all items in a specific Zotero collection.")
def get_collection_items(collection_key: str, limit: Optional[int] = None, *, ctx: Context) -> str:
"""
Gets all items in a specific Zotero collection
Args:
collection_key: The collection key/ID
limit: Optional how many items to return.
"""
try:
client = ZoteroWrapper()
items = client.collection_items(collection_key, limit=limit)
if not items:
return json.dumps({
"error": "Collection is empty",
"collection_key": collection_key,
"suggestion": "Add some items to this collection in Zotero"
}, indent=2)
formatted_items = [client.format_item(item) for item in items]
return json.dumps(formatted_items, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Failed to fetch collection items {collection_key}. Message: {str(e)}")
return json.dumps({
"error": f"Failed to fetch collection items. Message: {str(e)}",
"collection_key": collection_key,
}, indent=2)
@mcp.tool(description="Get detailed information about a specific item in the library")
def get_item_details(item_key: str, *, ctx: Context) -> str:
"""
Get detailed information about a specific item in the library
Args:
item_key: The paper's item key/ID
"""
try:
client = ZoteroWrapper()
item = client.item(item_key)
if not item:
return json.dumps({
"error": "Item not found",
"item_key": item_key,
"suggestion": "Verify the item exists and you have permission to access it"
}, indent=2)
formatted_item = client.format_item(item, include_abstract=True)
return json.dumps(formatted_item, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Failed to fetch item details {item_key}. Message: {str(e)}")
return json.dumps({
"error": f"Failed to fetch item details. Message: {str(e)}",
"item_key": item_key,
}, indent=2)
# FIXME: Misses local api endpoint in Zotero.
# @mcp.tool(description="Get fulltext as indexed by Zotero")
def get_item_fulltext(item_key: str, *, ctx: Context) -> str:
"""
Gets the full text content as indexed by Zotero.
There can be no fulltext.
Args:
item_key: The paper's item key/ID
"""
try:
client = ZoteroWrapper()
fulltext = client.fulltext_item(item_key)
if not fulltext:
return json.dumps({
"error": "No fulltext found",
"suggestion": "You need to index this file."
}, indent=2)
return json.dumps(fulltext, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Retrieving fulltext failed. Message: {str(e)}")
return json.dumps({
"error": f"Retrieving fulltext failed. Message: {str(e)}",
"item_key": item_key,
}, indent=2)
# FIXME: Misses way to provide PDF to Claude
# @mcp.tool(description="Retrieve PDF for item in the library")
def get_item_pdf(item_key: str, attachment_index: int = 0, *, ctx: Context) -> EmbeddedResource | str:
"""
Get the PDF content for a specific paper.
This returns the first PDF that is an attachement by default.
Args:
item_key: The paper's item key/ID
attachement_index: Look at attachement with index (Default 0)
"""
try:
client = ZoteroWrapper()
children = client.children(item_key)
pdf_attachments = [
{
'key': item['key'],
'title': item['data'].get('title', 'Untitled'),
'filename': item['data'].get('filename', 'Unknown'),
'index': idx
}
for idx, item in enumerate(children)
if item['data']['itemType'] == 'attachment'
and item['data'].get('contentType') == 'application/pdf'
]
if len(pdf_attachments) == 0:
return json.dumps({
"error": f"No PDF attachements found.",
"item_key": item_key,
"suggestion": "Check if this item has an attached PDF"
}, indent=2)
elif attachment_index >= len(pdf_attachments):
return json.dumps({
"error": f"Invalid attachment index {attachment_index}",
"item_key": item_key,
"available_attachments": pdf_attachments,
"suggestion": f"Choose an index between 0 and {len(pdf_attachments)-1}"
}, indent=2)
selected_attachment = pdf_attachments[attachment_index]
pdf_uri = urllib.parse.unquote(client.file_url(selected_attachment['key']), encoding='utf-8', errors='replace')
parsed_uri = urllib.parse.urlparse(pdf_uri)
pdf_path = pathlib.Path(parsed_uri.path.lstrip('/'))
try:
with pdf_path.open('rb') as fp:
pdf_content = fp.read()
# pdf_resource = BlobResourceContents(
# uri=f"zotero://items/{item_key}/pdf",
# mimeType="application/pdf",
# blob=base64.b64encode(pdf_content).decode())
# return EmbeddedResource(type='resource', resource=pdf_resource)
return json.dumps({
"type": "resource",
"resource": {
"uri": f"zotero://items/{item_key}/pdf",
"mimeType": "application/pdf",
"blob": base64.b64encode(pdf_content).decode()
}
}, indent=2)
except FileNotFoundError:
if ctx._fastmcp:
ctx.error(f"PDF file not found at {pdf_path} for item {item_key}")
return json.dumps({
"error": "PDF file not found",
"item_key": item_key,
"path": str(pdf_path),
"suggestion": "Check if the PDF file exists in the expected location"
}, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Failed to fetch PDF: {str(e)}")
return json.dumps({
"error": f"Failed to fetch PDF. {str(e)}",
"item_key": item_key,
}, indent=2)
@mcp.tool(description="Get tags used in the Zotero library")
def get_tags(limit: Optional[int] = None, *, ctx: Context) -> str:
"""Return all tags used in the Zotero library"""
try:
client = ZoteroWrapper()
items = client.tags(limit=limit)
if not items:
return json.dumps({
"error": "No tags found",
"suggestion": "You need to create tags in your library"
}, indent=2)
return json.dumps(items, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Retrieving tags failed. Message: {str(e)}")
return json.dumps({
"error": f"Retrieving tags failed. Message: {str(e)}"
}, indent=2)
@mcp.tool(description="Get recently added items (e.g. papers or attachements) to your library")
def get_recent(limit: Optional[int] = 10, *, ctx: Context) -> str:
"""Get recently added items (e.g. papers or attachements) to your library
Args:
limit: Number of items to return (default 10)
"""
try:
client = ZoteroWrapper()
# Convert string limit to int and apply constraints
limit_int = min(int(limit or 10), 100)
items = client.items(limit=limit_int, sort='dateAdded', direction='desc')
if not items:
return json.dumps({
"error": "No recent items found",
"suggestion": "Add some items to your Zotero library first"
}, indent=2)
formatted_items = [client.format_item(item, include_abstract=False) for item in items]
return json.dumps(formatted_items, indent=2)
except ValueError:
return json.dumps({
"error": "Invalid limit parameter",
"suggestion": "Please provide a valid number for limit"
}, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Failed to fetch recent items: {str(e)}")
return json.dumps({
"error": f"Failed to fetch recent items: {str(e)}"
}, indent=2)
@mcp.tool(description="Search the local Zotero library of the user.")
def search_library(query: str,
qmode: Literal["everything"] | Literal["titleCreatorYear"] = 'titleCreatorYear' ,
itemType: str = '-attachment',
limit: Optional[int] = None,
*, ctx: Context) -> str:
"""
Search your entire Zotero library
Args:
query: Search query
qmode: Query mode (`titleCreatorYear` or `everything` (default))
itemType: Configuration on items to search, (default no attachements).
limit: How many items to return (default unlimited)
"""
if not query.strip():
return json.dumps({
"error": "Search query is required"
}, indent=2)
try:
client = ZoteroWrapper()
items = client.items(q=query, qmode=qmode, itemType=itemType, limit=limit)
if len(items) < 1:
return json.dumps({
"error": "No results found",
"query": query,
"suggestion": "Try a different search term or verify your library contains matching items"
}, indent=2)
formatted_items = [client.format_item(item, include_abstract=False) for item in items]
return json.dumps(formatted_items, indent=2)
except Exception as e:
if ctx._fastmcp:
ctx.error(f"Search failed ({query}). Message: {str(e)}")
return json.dumps({
"error": f"Search failed. Message: {str(e)}",
"query": query,
}, indent=2)
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')