1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
| import asyncio
import typing
import csv
import time
from pydantic import BaseModel
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
class Publication(BaseModel):
title: str
authors: typing.Optional[str] = None
journal: typing.Optional[str] = None
first_affiliation: typing.Optional[str] = None
abstract: typing.Optional[str] = None
pmid: typing.Optional[str] = None
doi: typing.Optional[str] = None
publish_date: typing.Optional[str] = None
pubmed_url: str
def save_publications(publications: typing.List[Publication], file: str):
with open(file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(publications[0].model_dump().keys())
for publication in publications:
writer.writerow(publication.model_dump().values())
async def fullfill_publication(publication: Publication, page: Page):
await page.goto(publication.pubmed_url)
await page.wait_for_load_state('networkidle')
# 取 authors
author_eles = await page.query_selector_all('span.authors-list-item a.full-name')
publication.authors = ', '.join([await author_ele.text_content() for author_ele in author_eles])
# 取 journal
journal_ele = await page.wait_for_selector('#full-view-journal-trigger', timeout=15000)
publication.journal = (await journal_ele.text_content()).strip()
# 取 first_affiliation
await page.locator('#toggle-authors').click()
first_affiliation_ele = page.locator('#full-view-affiliation-1')
# [2:] 去掉前两个字符,因为前两个字符是 "1 "
publication.first_affiliation = (await first_affiliation_ele.text_content()).strip()[2:]
# 取 abstract
abstract_ele = await page.query_selector('#eng-abstract')
publication.abstract = (await abstract_ele.text_content()).strip()
# 取 pmid
pmid_ele = await page.query_selector('#full-view-identifiers strong.current-id')
publication.pmid = (await pmid_ele.text_content()).strip()
# 取 doi
doi_ele = await page.query_selector('a[data-ga-action="DOI"]')
publication.doi = (await doi_ele.text_content()).strip()
# 取 publish_date
publish_date_ele = await page.query_selector('span.cit')
publication.publish_date = (await publish_date_ele.text_content()).strip().split(';')[0]
async def run(query: str, *, size: int, save_to: str, batch_size: int = 10):
base_url = 'https://pubmed.ncbi.nlm.nih.gov/'
async with async_playwright() as p:
browser: Browser = await p.chromium.launch(
headless=True,
# 1. 禁用无头模式检测
args=['--disable-blink-features=AutomationControlled'],
)
context: BrowserContext = await browser.new_context(
# 2. 显示设置视口大小
viewport={'width': 1920, 'height': 1080},
# 3. 伪造 user-agent,避免被检测到是爬虫
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
)
page: Page = await context.new_page()
print('开始获取文献基本信息(标题、详情地址)')
publications = []
for page_num in range(1, size // 10 + 1):
print(f'获取第{page_num}页文献基本信息(标题、详情地址)')
# term 查询关键词
# sort 排序方式,pubdate 表示按出版日期排序
# page 页码
# filter=simsearch1.fha 表示文献要摘要可访问
# filter=datesearch.y_5 表示搜索范围为近 5 年文献
await page.goto(f'{base_url}?term={query}&sort=pubdate&page={page_num}&filter=simsearch1.fha&filter=datesearch.y_5')
title_eles = await page.query_selector_all('article a.docsum-title')
for title_ele in title_eles:
title = await title_ele.text_content()
href = await title_ele.get_attribute('href')
publications.append(Publication(
title=title.strip(),
# [1:] 去掉第一个字符,因为第一个字符是 /
pubmed_url=f'{base_url}{href.strip()[1:]}',
))
print('获取文献基本信息(标题、详情地址)完成')
# 采用多线程并发,获取文献详细信息
print('开始获取文献详细信息(作者、期刊、摘要、PMID、DOI、出版日期)')
for i in range(0, len(publications), batch_size):
await asyncio.gather(*[fullfill_publication(publication, await context.new_page()) for publication in publications[i:i + batch_size]])
print('获取文献详细信息(作者、期刊、摘要、PMID、DOI、出版日期)完成')
save_publications(publications, save_to)
await browser.close()
if __name__ == '__main__':
start_time = time.time()
size = 100
batch_size = 10
save_to = 'publications_improved.csv'
asyncio.run(run(
'maize',
size=size,
save_to=save_to,
batch_size=batch_size,
))
end_time = time.time()
print(f'耗时:{end_time - start_time} 秒')
|