-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathannas_archive.py
197 lines (166 loc) · 7.52 KB
/
annas_archive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from contextlib import closing
from http.client import RemoteDisconnected
from math import ceil
from typing import Generator
from urllib.error import HTTPError, URLError
from urllib.parse import quote_plus
from urllib.request import urlopen, Request
from calibre import browser
from calibre.gui2 import open_url
from calibre.gui2.store import StorePlugin
from calibre.gui2.store.search_result import SearchResult
from calibre.gui2.store.web_store_dialog import WebStoreDialog
from calibre_plugins.store_annas_archive.constants import DEFAULT_MIRRORS, RESULTS_PER_PAGE, SearchOption
from lxml import html
try:
from qt.core import QUrl
except (ImportError, ModuleNotFoundError):
from PyQt5.Qt import QUrl
SearchResults = Generator[SearchResult, None, None]
class AnnasArchiveStore(StorePlugin):
def __init__(self, gui, name, config=None, base_plugin=None):
super().__init__(gui, name, config, base_plugin)
self.working_mirror = None
def _search(self, url: str, max_results: int, timeout: int) -> SearchResults:
br = browser()
doc = None
counter = max_results
for page in range(1, ceil(max_results / RESULTS_PER_PAGE) + 1):
mirrors = self.config.get('mirrors', DEFAULT_MIRRORS)
if self.working_mirror is not None:
mirrors.remove(self.working_mirror)
mirrors.insert(0, self.working_mirror)
for mirror in mirrors:
with closing(br.open(url.format(base=mirror, page=page), timeout=timeout)) as resp:
if resp.code < 500 or resp.code > 599:
self.working_mirror = mirror
doc = html.fromstring(resp.read())
break
if doc is None:
self.working_mirror = None
raise Exception('No working mirrors of Anna\'s Archive found.')
books = doc.xpath('//table/tr')
for book in books:
if counter <= 0:
break
columns = book.findall("td")
s = SearchResult()
cover = columns[0].xpath('./a[@tabindex="-1"]')
if cover:
cover = cover[0]
else:
continue
s.detail_item = cover.get('href', '').split('/')[-1]
if not s.detail_item:
continue
s.cover_url = ''.join(cover.xpath('(./span/img/@src)[1]'))
s.title = ''.join(columns[1].xpath('./a/span/text()'))
s.author = ''.join(columns[2].xpath('./a/span/text()'))
s.formats = ''.join(columns[9].xpath('./a/span/text()')).upper()
s.price = '$0.00'
s.drm = SearchResult.DRM_UNLOCKED
counter -= 1
yield s
def search(self, query, max_results=10, timeout=60) -> SearchResults:
url = f'{{base}}/search?page={{page}}&q={quote_plus(query)}&display=table'
search_opts = self.config.get('search', {})
for option in SearchOption.options:
value = search_opts.get(option.config_option, ())
if isinstance(value, str):
value = (value,)
for item in value:
url += f'&{option.url_param}={item}'
yield from self._search(url, max_results, timeout)
def open(self, parent=None, detail_item=None, external=False):
if detail_item:
url = self._get_url(detail_item)
else:
if self.working_mirror is not None:
url = self.working_mirror
else:
url = self.config.get('mirrors', DEFAULT_MIRRORS)[0]
if external or self.config.get('open_external', False):
open_url(QUrl(url))
else:
d = WebStoreDialog(self.gui, self.working_mirror, parent, url)
d.setWindowTitle(self.name)
d.set_tags(self.config.get('tags', ''))
d.exec()
def get_details(self, search_result: SearchResult, timeout=60):
if not search_result.formats:
return
_format = '.' + search_result.formats.lower()
link_opts = self.config.get('link', {})
url_extension = link_opts.get('url_extension', True)
content_type = link_opts.get('content_type', False)
br = browser()
with closing(br.open(self._get_url(search_result.detail_item), timeout=timeout)) as f:
doc = html.fromstring(f.read())
for link in doc.xpath('//div[@id="md5-panel-downloads"]/ul[contains(@class, "list-inside")]/li/a[contains(@class, "js-download-link")]'):
url = link.get('href')
link_text = ''.join(link.itertext())
if link_text == 'Libgen.li':
url = self._get_libgen_link(url, br)
elif link_text == 'Libgen.rs Fiction' or link_text == 'Libgen.rs Non-Fiction':
url = self._get_libgen_nonfiction_link(url, br)
elif link_text.startswith('Sci-Hub'):
url = self._get_scihub_link(url, br)
elif link_text == 'Z-Library':
url = self._get_zlib_link(url, br)
else:
continue
if not url:
continue
# Takes longer, but more accurate
if content_type:
try:
with urlopen(Request(url, method='HEAD'), timeout=timeout) as resp:
if resp.info().get_content_maintype() != 'application':
continue
except (HTTPError, URLError, TimeoutError, RemoteDisconnected):
pass
elif url_extension:
# Speeds it up by checking the extension of the url.
# Might miss a direct url that doesn't end with the extension
params = url.find("?")
if params < 0:
params = None
if url.endswith(_format, 0, params):
continue
search_result.downloads[f"{link_text}.{search_result.formats}"] = url
@staticmethod
def _get_libgen_link(url: str, br) -> str:
with closing(br.open(url)) as resp:
doc = html.fromstring(resp.read())
scheme, _, host, _ = resp.geturl().split('/', 3)
url = ''.join(doc.xpath('//a[h2[text()="GET"]]/@href'))
return f"{scheme}//{host}/{url}"
@staticmethod
def _get_libgen_nonfiction_link(url: str, br) -> str:
with closing(br.open(url)) as resp:
doc = html.fromstring(resp.read())
url = ''.join(doc.xpath('//h2/a[text()="GET"]/@href'))
return url
@staticmethod
def _get_scihub_link(url, br):
with closing(br.open(url)) as resp:
doc = html.fromstring(resp.read())
scheme, _ = resp.geturl().split('/', 1)
url = ''.join(doc.xpath('//embed[@id="pdf"]/@src'))
if url:
return scheme + url
@staticmethod
def _get_zlib_link(url, br):
with closing(br.open(url)) as resp:
doc = html.fromstring(resp.read())
scheme, _, host, _ = resp.geturl().split('/', 3)
url = ''.join(doc.xpath('//a[contains(@class, "addDownloadedBook")]/@href'))
if url:
return f"{scheme}//{host}/{url}"
def _get_url(self, md5):
return f"{self.working_mirror}/md5/{md5}"
def config_widget(self):
from calibre_plugins.store_annas_archive.config import ConfigWidget
return ConfigWidget(self)
def save_settings(self, config_widget):
config_widget.save_settings()