"""
---------------------------------------------------------------------------
原项目: calibre-douban (基于 Apache-2.0 协议开源)
原地址: https://github.com/fugary/calibre-douban
原作者: fugary
---------------------------------------------------------------------------
修改声明 (Modification Notice):
本文件已由 [PumpkinSin] 进行修改以适应个人需求。
代码逻辑可能与原版存在较大差异。仅供个人学习和使用。
"""
import re
import time
import random
import gzip
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from queue import Queue, Empty
from urllib.parse import urlparse, unquote, urlencode
from urllib.request import Request, urlopen
from calibre import random_user_agent
from calibre.ebooks.metadata import check_isbn
from calibre.ebooks.metadata.book.base import Metadata
from calibre.ebooks.metadata.sources.base import Source, Option
from calibre.ebooks.BeautifulSoup import BeautifulSoup
from bs4 import Tag
DOUBAN_BOOK_BASE = "https://book.douban.com/"
DOUBAN_SEARCH_JSON_URL = "https://www.douban.com/j/search"
DOUBAN_SEARCH_URL = "https://www.douban.com/search"
DOUBAN_BOOK_URL = 'https://book.douban.com/subject/%s/'
DOUBAN_BOOK_CAT = "1001"
DOUBAN_CONCURRENCY_SIZE = 5 # 并发查询数
DOUBAN_BOOK_URL_PATTERN = re.compile(".*/subject/(\\d+)/?")
PROVIDER_NAME = "DoubanBooks"
PROVIDER_ID = "doubanbook"
PROVIDER_VERSION = (2, 3, 0)
PROVIDER_AUTHOR = 'Gary Fu'
class DoubanBookSearcher:
def __init__(self, max_workers, douban_delay_enable, douban_login_cookie):
self.book_parser = DoubanBookHtmlParser()
self.max_workers = max_workers
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='douban_async')
self.douban_delay_enable = douban_delay_enable
self.douban_login_cookie = douban_login_cookie
def calc_url(self, href):
query = urlparse(href).query
params = {item.split('=')[0]: item.split('=')[1] for item in query.split('&')}
url = unquote(params['url'])
if DOUBAN_BOOK_URL_PATTERN.match(url):
return url
def load_book_urls_new(self, query, log):
params = {"cat": DOUBAN_BOOK_CAT, "q": query}
url = DOUBAN_SEARCH_URL + "?" + urlencode(params)
log.info(f'Load books by search url: {url}')
res = urlopen(Request(url, headers=self.get_headers(), method='GET'))
book_urls = []
if res.status in [200, 201]:
html_content = self.get_res_content(res)
if self.is_prohibited(html_content, log):
return book_urls
html = BeautifulSoup(html_content)
alist = html.select('a.nbg')
for link in alist:
href = link.get('href', '')
parsed = self.calc_url(href)
if parsed:
if len(book_urls) < self.max_workers:
book_urls.append(parsed)
return book_urls
def search_books(self, query, log):
book_urls = self.load_book_urls_new(query, log)
books = []
futures = [self.thread_pool.submit(self.load_book, book_url, log) for book_url in book_urls]
for future in as_completed(futures):
book = future.result()
if self.is_valid_book(book):
books.append(book)
return books
def load_book(self, url, log):
book = None
start_time = time.time()
if self.douban_delay_enable:
self.random_sleep(log)
log.info(f"Requesting book page: {url}")
try:
res = urlopen(Request(url, headers=self.get_headers(), method='GET'))
log.info(f"Response status: {res.status}")
if res.status in [200, 201]:
book_detail_content = self.get_res_content(res)
if self.is_prohibited(book_detail_content, log):
log.error(f"Access prohibited for URL: {url}")
return
log.info(f"Downloaded: {url} Successful, Time {(time.time() - start_time) * 1000:.0f}ms")
try:
book = self.book_parser.parse_book(url, book_detail_content)
if not self.is_valid_book(book):
log.error(f"Parse book content error - Invalid book data")
log.error(f"Content preview: {book_detail_content[:500]}")
except Exception as e:
log.exception(f"Parse book content error: {e}")
log.error(f"Content preview: {book_detail_content[:500]}")
else:
log.error(f"HTTP request failed with status: {res.status}")
except Exception as e:
log.exception(f"Failed to load book from {url}: {e}")
return book
def is_valid_book(self, book):
return book is not None and book.get('title', None)
def is_prohibited(self, html_content, log):
prohibited = html_content is not None and '
禁止访问' in html_content
if prohibited:
html = BeautifulSoup(html_content)
html_content = html.select_one('div#content')
log.info(f'Douban网页访问失败:{html_content}')
return prohibited
def get_res_content(self, res):
encoding = res.info().get('Content-Encoding')
if encoding == 'gzip':
res_content = gzip.decompress(res.read())
else:
res_content = res.read()
return res_content.decode(res.headers.get_content_charset())
def get_headers(self):
headers = {'User-Agent': random_user_agent(), 'Accept-Encoding': 'gzip, deflate'}
if self.douban_login_cookie:
headers['Cookie'] = self.douban_login_cookie
return headers
def random_sleep(self, log):
random_sec = random.random() / 10
log.info("Random sleep time {}s".format(random_sec))
time.sleep(random_sec)
class DoubanBookHtmlParser:
def __init__(self):
self.id_pattern = DOUBAN_BOOK_URL_PATTERN
self.tag_pattern = re.compile("criteria = '(.+)'")
def parse_book(self, url, book_content):
book = {}
html = BeautifulSoup(book_content)
if html is None or html.select is None: # html判空处理
return None
title_element = html.select("span[property='v:itemreviewed']")
book['title'] = self.get_text(title_element)
share_element = html.select("a[data-url]")
if len(share_element):
url = share_element[0].get('data-url')
book['url'] = url
id_match = self.id_pattern.match(url)
if id_match:
book['id'] = id_match.group(1)
img_element = html.select("a.nbg")
if len(img_element):
cover = img_element[0].get('href', '')
if not cover or cover.endswith('update_image'):
book['cover'] = ''
else:
book['cover'] = cover
rating_element = html.select("strong[property='v:average']")
book['rating'] = self.get_rating(rating_element)
elements = html.select("span.pl")
book['authors'] = []
book['translators'] = []
book['publisher'] = ''
for element in elements:
text = self.get_text(element)
parent_ele = element.find_parent()
if text.startswith("作者"):
book['authors'].extend([self.get_text(author_element) for author_element in
filter(self.author_filter, parent_ele.select("a"))])
elif text.startswith("译者"):
book['translators'].extend([self.get_text(translator_element) for translator_element in
filter(self.author_filter, parent_ele.select("a"))])
elif text.startswith("出版社"):
book['publisher'] = self.get_tail(element)
elif text.startswith("副标题"):
book['title'] = book['title'] + ':' + self.get_tail(element)
elif text.startswith("出版年"):
book['publishedDate'] = self.get_tail(element)
elif text.startswith("ISBN"):
book['isbn'] = self.get_tail(element)
elif text.startswith("丛书"):
book['series'] = self.get_text(element.find_next_sibling())
summary_element = html.select("div#link-report div.intro")
book['description'] = ''
if len(summary_element):
book['description'] = str(summary_element[-1])
book['tags'] = self.get_tags(book_content)
book['source'] = {
"id": PROVIDER_ID,
"description": PROVIDER_NAME,
"link": DOUBAN_BOOK_BASE
}
book['language'] = self.get_book_language(book['title'])
return book
def get_book_language(self, title):
pattern = r'^[a-zA-Z\-_]+$'
if title and ('英文版' in title or bool(re.match(pattern, title))):
return 'en_US'
return 'zh_CN'
def get_tags(self, book_content):
tag_match = self.tag_pattern.findall(book_content)
if len(tag_match):
return [tag.replace('7:', '') for tag in
filter(lambda tag: tag and tag.startswith('7:'), tag_match[0].split('|'))]
return []
def get_rating(self, rating_element):
return float(self.get_text(rating_element, '0')) / 2
def author_filter(self, a_element):
a_href = a_element.get('href', '')
return '/author' in a_href or '/search' in a_href
def get_text(self, element, default_str=''):
text = default_str
if isinstance(element, Tag):
text = element.get_text(strip=True)
elif len(element) and isinstance(element[0], Tag):
text = element[0].get_text(strip=True)
return text if text else default_str
def get_tail(self, element, default_str=''):
text = default_str
if isinstance(element, Tag) and element.next_siblings:
for next_sibling in element.next_siblings:
if isinstance(next_sibling, str):
text += next_sibling.strip()
elif isinstance(next_sibling, Tag):
if not text:
text = self.get_text(next_sibling, default_str)
break
return text if text else default_str
class NewDoubanBooks(Source):
name = 'DoubanBooks' # Name of the plugin
description = 'Downloads metadata and covers from Douban Books web site.'
supported_platforms = ['windows', 'osx', 'linux'] # Platforms this plugin will run on
author = PROVIDER_AUTHOR # The author of this plugin
version = PROVIDER_VERSION # The version number of this plugin
minimum_calibre_version = (5, 0, 0)
capabilities = frozenset(['identify', 'cover'])
touched_fields = frozenset([
'title', 'authors', 'tags', 'pubdate', 'comments', 'publisher',
'identifier:isbn', 'rating', 'identifier:' + PROVIDER_ID
]) # language currently disabled
book_searcher = None
options = (
# name, type, default, label, default, choices
# type 'number', 'string', 'bool', 'choices'
Option(
'douban_concurrency_size', 'number', DOUBAN_CONCURRENCY_SIZE,
_('Douban concurrency size:'),
_('The number of douban concurrency cannot be too high!')
),
Option(
'add_translator_to_author', 'bool', True,
_('Add translator to author'),
_('If selected, translator will be written to metadata as author')
),
Option(
'douban_delay_enable', 'bool', True,
_('douban random delay'),
_('Random delay for a period of time before request')
),
Option(
'douban_search_with_author', 'bool', True,
_('search with authors'),
_('add authors to search keywords')
),
Option(
'douban_login_cookie', 'string', None,
_('douban login cookie'),
_('Browser cookie after login')
),
)
def __init__(self, *args, **kwargs):
Source.__init__(self, *args, **kwargs)
concurrency_size = int(self.prefs.get('douban_concurrency_size'))
douban_delay_enable = bool(self.prefs.get('douban_delay_enable'))
douban_login_cookie = self.prefs.get('douban_login_cookie')
self.douban_search_with_author = bool(self.prefs.get('douban_search_with_author'))
self.book_searcher = DoubanBookSearcher(concurrency_size, douban_delay_enable, douban_login_cookie)
def get_book_url(self, identifiers): # {{{
douban_id = identifiers.get(PROVIDER_ID, None)
if douban_id is not None:
return PROVIDER_ID, douban_id, DOUBAN_BOOK_URL % douban_id
def download_cover(
self,
log,
result_queue,
abort,
title=None,
authors=None,
identifiers={},
timeout=30,
get_best_cover=False):
cached_url = self.get_cached_cover_url(identifiers)
if cached_url is None:
log.info('No cached cover found, running identify')
rq = Queue()
self.identify(
log,
rq,
abort,
title=title,
authors=authors,
identifiers=identifiers
)
if abort.is_set():
return
results = []
while True:
try:
results.append(rq.get_nowait())
except Empty:
break
results.sort(
key=self.identify_results_keygen(
title=title, authors=authors, identifiers=identifiers
)
)
for mi in results:
cached_url = self.get_cached_cover_url(mi.identifiers)
if cached_url is not None:
break
if cached_url is None:
log.info('No cover found')
return
br = self.browser
log('Downloading cover from:', cached_url)
try:
if self.book_searcher.douban_login_cookie:
br = br.clone_browser()
br.set_current_header('Cookie', self.book_searcher.douban_login_cookie)
br.set_current_header('Referer', DOUBAN_BOOK_BASE)
cdata = br.open_novisit(cached_url, timeout=timeout).read()
if cdata:
result_queue.put((self, cdata))
except:
log.exception('Failed to download cover from:', cached_url)
def get_cached_cover_url(self, identifiers): # {{{
url = None
db = identifiers.get(PROVIDER_ID, None)
if db is None:
isbn = identifiers.get('isbn', None)
if isbn is not None:
db = self.cached_isbn_to_identifier(isbn)
if db is not None:
url = self.cached_identifier_to_cover_url(db)
return url
def identify(
self,
log,
result_queue,
abort,
title=None,
authors=None, # {{{
identifiers={},
timeout=30):
add_translator_to_author = self.prefs.get(
'add_translator_to_author')
log.info(f'Starting identify with identifiers: {identifiers}, title: {title}, authors: {authors}')
isbn = check_isbn(identifiers.get('isbn', None))
new_douban = self.get_book_url(identifiers)
if new_douban:
# 如果有new_douban的id,直接精确获取数据
log.info(f'Found {PROVIDER_ID} identifier: {new_douban[1]}, URL: {new_douban[2]}')
log.info(f'Attempting to load book directly from: {new_douban[2]}')
book = self.book_searcher.load_book(new_douban[2], log)
books = []
if self.book_searcher.is_valid_book(book):
log.info(f'Successfully loaded book from douban ID: {book}')
books.append(book)
else:
log.error(f'Failed to load valid book data from douban ID: {new_douban[1]}')
log.error(f'Book data received: {book}')
else:
log.info('No douban identifier found, performing search')
search_keyword = title
if self.douban_search_with_author and title and authors:
authors_str = ','.join(authors)
search_keyword = f'{title} {authors_str}'
books = self.book_searcher.search_books(isbn or search_keyword, log)
if not len(books) and title and (isbn or search_keyword != title):
books = self.book_searcher.search_books(title, log) # 用isbn或者title+auther没有数据,用title重新搜一遍
log.info(f'Total books found: {len(books)}')
for book in books:
ans = self.to_metadata(book, add_translator_to_author, log)
if isinstance(ans, Metadata):
db = ans.identifiers[PROVIDER_ID]
if ans.isbn:
self.cache_isbn_to_identifier(ans.isbn, db)
if ans.cover:
self.cache_identifier_to_cover_url(db, ans.cover)
self.clean_downloaded_metadata(ans)
result_queue.put(ans)
def to_metadata(self, book, add_translator_to_author, log):
if book:
authors = (book['authors'] + book['translators']
) if add_translator_to_author else book['authors']
mi = Metadata(book['title'], authors)
mi.identifiers = {PROVIDER_ID: book['id']}
mi.url = book['url']
mi.cover = book.get('cover', None)
mi.publisher = book['publisher']
pubdate = book.get('publishedDate', None)
if pubdate:
try:
if re.compile('^\\d{4}-\\d+$').match(pubdate):
mi.pubdate = datetime.strptime(pubdate, '%Y-%m')
elif re.compile('^\\d{4}-\\d+-\\d+$').match(pubdate):
mi.pubdate = datetime.strptime(pubdate, '%Y-%m-%d')
except:
log.error('Failed to parse pubdate %r' % pubdate)
mi.comments = book['description']
mi.tags = book.get('tags', [])
mi.rating = book['rating']
mi.isbn = book.get('isbn', '')
mi.series = book.get('series', [])
mi.language = book.get('language', 'zh_CN')
log.info('parsed book', book)
return mi
if __name__ == "__main__":
# To run these test use: calibre-debug -e ./__init__.py
from calibre.ebooks.metadata.sources.test import (
test_identify_plugin, title_test, authors_test
)
test_identify_plugin(
NewDoubanBooks.name, [
({
'identifiers': {
'isbn': '9787111544937'
},
'title': '深入理解计算机系统(原书第3版)'
}, [title_test('深入理解计算机系统(原书第3版)', exact=True),
authors_test(['randal e.bryant', "david o'hallaron", '贺莲', '龚奕利'])]),
({
'title': '凤凰架构'
}, [title_test('凤凰架构:构建可靠的大型分布式系统', exact=True),
authors_test(['周志明'])])
]
)