From 70960f7fbdc217d9dc01cd497a16bff591520e99 Mon Sep 17 00:00:00 2001 From: hsp <1561182770@163.com> Date: Tue, 23 Jan 2024 15:21:03 +0800 Subject: [PATCH] feat:Real-time data access --- .../templates/pg-init-data-configmap.yaml | 43 ++ pypi/data-processing/Dockerfile | 1 + .../db-scripts/init-database-schema.sql | 45 ++ pypi/data-processing/requirements.txt | 4 + .../src/common/log_tag_const.py | 2 + .../dp_document_image_db_operate.py | 236 +++++++ .../dp_document_web_url_db_operate.py | 263 ++++++++ .../src/utils/web_url_utils.py | 620 ++++++++++++++++++ 8 files changed, 1214 insertions(+) create mode 100644 pypi/data-processing/src/database_operate/dp_document_image_db_operate.py create mode 100644 pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py create mode 100644 pypi/data-processing/src/utils/web_url_utils.py diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index 56e5cb198..ddd598a99 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -326,7 +326,50 @@ data: COMMENT ON COLUMN public.data_process_task_stage_log.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_stage_log.update_user IS '更新人'; COMMENT ON COLUMN public.data_process_task_stage_log.update_program IS '更新程序'; + + create table if not exists data_process_task_document_web_url + ( + id varchar(32) not null + constraint data_process_task_document_web_url_pkey + primary key, + task_id varchar(32), + document_id varchar(32), + level varchar(32), + web_url varchar(4096), + title varchar(1024), + description text, + content text, + content_clean text, + language varchar(32), + status varchar(4), + error_message text, + create_datetime varchar(32), + create_user varchar(32), + create_program varchar(64), + update_datetime varchar(32), + update_user varchar(32), + update_program varchar(32) + ); + create table if not exists data_process_task_document_image + ( + id varchar(32) not null + constraint data_process_task_document_image_pkey + primary key, + task_id varchar(32), + document_id varchar(512), + url varchar(1024), + image_path varchar(4096), + ocr_content text, + image_info text, + create_datetime varchar(32), + create_user varchar(32), + create_program varchar(64), + update_datetime varchar(32), + update_user varchar(32), + update_program varchar(32), + meta_info text + ); kind: ConfigMap metadata: diff --git a/pypi/data-processing/Dockerfile b/pypi/data-processing/Dockerfile index 41e4ce3a0..50e4c0068 100644 --- a/pypi/data-processing/Dockerfile +++ b/pypi/data-processing/Dockerfile @@ -28,6 +28,7 @@ WORKDIR /arcadia_app RUN chmod 777 /arcadia_app/entrypoint.sh RUN pip install -r requirements.txt +RUN playwright install && playwright install-deps ENTRYPOINT ["./entrypoint.sh"] diff --git a/pypi/data-processing/db-scripts/init-database-schema.sql b/pypi/data-processing/db-scripts/init-database-schema.sql index ba7ff471d..7a4d7d451 100644 --- a/pypi/data-processing/db-scripts/init-database-schema.sql +++ b/pypi/data-processing/db-scripts/init-database-schema.sql @@ -321,3 +321,48 @@ COMMENT ON COLUMN public.data_process_task_stage_log.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_stage_log.update_user IS '更新人'; COMMENT ON COLUMN public.data_process_task_stage_log.update_program IS '更新程序'; + + create table if not exists data_process_task_document_web_url + ( + id varchar(32) not null + constraint data_process_task_document_web_url_pkey + primary key, + task_id varchar(32), + document_id varchar(32), + level varchar(32), + web_url varchar(4096), + title varchar(1024), + description text, + content text, + content_clean text, + language varchar(32), + status varchar(4), + error_message text, + create_datetime varchar(32), + create_user varchar(32), + create_program varchar(64), + update_datetime varchar(32), + update_user varchar(32), + update_program varchar(32) + ); + + create table if not exists data_process_task_document_image + ( + id varchar(32) not null + constraint data_process_task_document_image_pkey + primary key, + task_id varchar(32), + document_id varchar(512), + url varchar(1024), + image_path varchar(4096), + ocr_content text, + image_info text, + create_datetime varchar(32), + create_user varchar(32), + create_program varchar(64), + update_datetime varchar(32), + update_user varchar(32), + update_program varchar(32), + meta_info text + ); + diff --git a/pypi/data-processing/requirements.txt b/pypi/data-processing/requirements.txt index a310da981..5c6d7cc52 100644 --- a/pypi/data-processing/requirements.txt +++ b/pypi/data-processing/requirements.txt @@ -22,3 +22,7 @@ opencc-python-reimplemented==0.1.7 selectolax==0.3.17 openai==1.3.7 python-docx==1.1.0 + +bs4==0.0.1 +playwright=1.40.0 +pillow==10.2.0 diff --git a/pypi/data-processing/src/common/log_tag_const.py b/pypi/data-processing/src/common/log_tag_const.py index 71bd846e2..fe1722c33 100644 --- a/pypi/data-processing/src/common/log_tag_const.py +++ b/pypi/data-processing/src/common/log_tag_const.py @@ -40,3 +40,5 @@ OPEN_AI = "Open AI" CONFIG = "Config" + +WEB_CRAWLING = "Web Url Utils" diff --git a/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py new file mode 100644 index 000000000..4b1c49a4a --- /dev/null +++ b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py @@ -0,0 +1,236 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from database_clients import postgresql_pool_client +from utils import date_time_utils + +def add( + req_json, + pool +): + """Add a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理URL-新增' + + params = { + 'id': req_json['id'], + 'document_id': req_json['document_id'], + 'task_id': req_json['task_id'], + 'url': req_json['url'], + 'image_path': req_json['image_path'], + 'ocr_content': req_json['ocr_content'], + 'image_info': req_json['image_info'], + 'meta_info': req_json['meta_info'], + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task_document_image ( + id, + document_id, + task_id, + url, + image_path, + ocr_content, + image_info, + meta_info, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + ) + values ( + %(id)s, + %(document_id)s, + %(task_id)s, + %(url)s, + %(image_path)s, + %(ocr_content)s, + %(image_info)s, + %(meta_info)s, + %(create_datetime)s, + %(create_user)s, + %(create_program)s, + %(update_datetime)s, + %(update_user)s, + %(update_program)s + ) + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def update_by_id( + req_json, + pool +): + """update a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理URL-更新' + + params = { + 'id': req_json['id'], + 'document_id': req_json['document_id'], + 'task_id': req_json['task_id'], + 'url': req_json['url'], + 'image_path': req_json['image_path'], + 'ocr_content': req_json['ocr_content'], + 'image_info': req_json['image_info'], + 'meta_info': req_json['meta_info'], + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + update public.data_process_task_document_image set + url = %(url)s, + image_path = %(image_path)s, + ocr_content = %(ocr_content)s, + image_info = %(image_info)s, + meta_info = %(meta_info)s + update_datetime = %(update_datetime)s, + update_user = %(update_user)s, + update_program = %(update_program)s + where id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def delete_by_id( + req_json, + pool +): + """delete a record""" + params = { + 'id': req_json['id'] + } + + sql = """ + delete from public.data_process_task_document_image + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def info_by_id( + req_json, + pool +): + """info with id""" + params = { + 'id': req_json['id'] + } + + sql = """ + select + id, + document_id, + task_id, + url, + image_path, + ocr_content, + image_info, + meta_info, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_image + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res + + +def list_by_count( + req_json, + pool +): + """Get count for the list url with page""" + params = { + 'keyword': '%' + req_json['url'] + '%' + } + + sql = """ + select + count(*) + from + public.data_process_task_document_image + where + web_url like %(keyword)s + """.strip() + + res = postgresql_pool_client.execute_count_query(pool, sql, params) + return res + + +def list_by_page( + req_json, + pool +): + """Get the list data for url by page""" + params = { + 'keyword': '%' + req_json['url'] + '%', + 'pageIndex': int(req_json['pageIndex']), + 'pageSize': int(req_json['pageSize']) + } + + sql = """ + select + id, + document_id, + task_id, + url, + image_path, + ocr_content, + image_info, + meta_info, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_image + where + url like %(keyword)s + order by create_datetime desc + limit %(pageSize)s offset %(pageIndex)s + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py b/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py new file mode 100644 index 000000000..ac48883c9 --- /dev/null +++ b/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py @@ -0,0 +1,263 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from database_clients import postgresql_pool_client +from utils import date_time_utils + +def add( + req_json, + pool +): + """Add a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理URL-新增' + + params = { + 'id': req_json['id'], + 'document_id': req_json['document_id'], + 'level': req_json['level'], + 'web_url': req_json['web_url'], + 'title': req_json['title'], + 'description': req_json['description'], + 'content': req_json['content'], + 'content_clean': req_json['content_clean'], + 'language': req_json['language'], + 'status': req_json['status'], + 'error_message': req_json['error_message'], + 'task_id': req_json['task_id'], + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task_document_web_url ( + id, + document_id, + level, + web_url, + title, + description, + content, + content_clean, + language, + status, + error_message, + task_id, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + ) + values ( + %(id)s, + %(document_id)s, + %(level)s, + %(web_url)s, + %(title)s, + %(description)s, + %(content)s, + %(content_clean)s, + %(language)s, + %(status)s, + %(error_message)s, + %(task_id)s, + %(create_datetime)s, + %(create_user)s, + %(create_program)s, + %(update_datetime)s, + %(update_user)s, + %(update_program)s + ) + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def update_by_id( + req_json, + pool +): + """update a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理URL-更新' + + params = { + 'id': req_json['id'], + 'document_id': req_json['document_id'], + 'level': req_json['level'], + 'web_url': req_json['web_url'], + 'title': req_json['title'], + 'description': req_json['description'], + 'content': req_json['content'], + 'content_clean': req_json['content_clean'], + 'language': req_json['language'], + 'status': req_json['status'], + 'error_message': req_json['error_message'], + 'task_id': req_json['task_id'], + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + update public.data_process_task_document_web_url set + web_url = %(web_url)s, + title = %(title)s, + description = %(description)s, + content = %(content)s, + content_clean = %(content_clean)s, + language = %(language)s, + status = %(status)s, + error_message = %(error_message)s, + update_datetime = %(update_datetime)s, + update_user = %(update_user)s, + update_program = %(update_program)s + where id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def delete_by_id( + req_json, + pool +): + """delete a record""" + params = { + 'id': req_json['id'] + } + + sql = """ + delete from public.data_process_task_document_web_url + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def info_by_id( + req_json, + pool +): + """info with id""" + params = { + 'id': req_json['id'] + } + + sql = """ + select + id, + document_id, + level, + web_url, + title, + description, + content, + content_clean, + language, + status, + error_message, + task_id, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_web_url + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res + + +def list_by_count( + req_json, + pool +): + """Get count for the list url with page""" + params = { + 'keyword': '%' + req_json['web_url'] + '%' + } + + sql = """ + select + count(*) + from + public.data_process_task_document_web_url + where + web_url like %(keyword)s + """.strip() + + res = postgresql_pool_client.execute_count_query(pool, sql, params) + return res + + +def list_by_page( + req_json, + pool +): + """Get the list data for url by page""" + params = { + 'keyword': '%' + req_json['title'] + '%', + 'pageIndex': int(req_json['pageIndex']), + 'pageSize': int(req_json['pageSize']) + } + + sql = """ + select + id, + document_id, + level, + web_url, + title, + description, + content, + content_clean, + language, + status, + error_message, + task_id, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_web_url + where + title like %(keyword)s + order by create_datetime desc + limit %(pageSize)s offset %(pageIndex)s + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/pypi/data-processing/src/utils/web_url_utils.py b/pypi/data-processing/src/utils/web_url_utils.py new file mode 100644 index 000000000..153c2c7ec --- /dev/null +++ b/pypi/data-processing/src/utils/web_url_utils.py @@ -0,0 +1,620 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fnmatch +import logging +import time +import traceback +from io import BytesIO +from urllib.parse import urldefrag, urljoin, urlparse + +import requests +import ulid +from bs4 import BeautifulSoup +from PIL import Image +from playwright.sync_api import sync_playwright + +from common import log_tag_const +from utils import date_time_utils + +logger = logging.getLogger(__name__) + +def handle_website( + url, + interval_time=1, + resource_types=[], + max_depth=1, + max_count=100, + exclude_sub_urls=[], + include_sub_urls=[], + exclude_img_info={"weight": 250, "height" : 250} +): + """Recursively crawling the content, images, and other resources on a website + + Args: + url (str): The URL to crawl. + interval_time (int): The interval time between crawling web pages. + resource_types (list): Crawling the types of resource on a web page. + max_depth (int): The max depth of the recursive loading. + max_count (int): The max web count of the recursive loading. + exclude_sub_urls (list): Excluding subpage urls while crawling a website. + include_sub_urls (list): Including subpage urls while crawling a website. + exclude_img_info (dict): Excluding images with specified attributes (eg: weight、height) while crawling a website. + Returns: + [ + { + "id": "9D4oAQWh", + "pid": "0", + "url": "https://mp.weixin.qq.com/s/65DFY...", + "type": "page", + "level": "1" + } + ] + Examples: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "interval_time": 1, + "resource_types": [ + "png", + "jpg" + ], + "max_depth": 1, + "max_count": 100, + "exclude_sub_urls": [ + "" + ], + "include_sub_urls": [ + "" + ], + "exclude_img_info": { + "weight": 250, + "height": 250 + } + } + Raises: + """ + logger.debug(f"{log_tag_const.WEB_CRAWLING} start time {date_time_utils.now_str()}") + logger.debug( + "\n".join( + [ + f"{log_tag_const.WEB_CRAWLING} In parameter:", + f" url: {url}", + f" interval_time: {interval_time}" , + f" resource_types: {resource_types} " , + f" max_depth: {max_depth} ", + f" max_count: {max_count} ", + f" exclude_sub_urls: {exclude_sub_urls} ", + f" include_sub_urls: {include_sub_urls} ", + f" exclude_img_info: {exclude_img_info}" + ] + ) + ) + with sync_playwright() as p: + browser = p.chromium.launch() + context = browser.new_context(ignore_https_errors=True) + page = context.new_page() + contents = _handle_children_url( + page, + url, + interval_time, + resource_types, + max_depth, + max_count, + exclude_sub_urls, + include_sub_urls, + exclude_img_info + ) + context.close() + browser.close() + logger.debug(f"{log_tag_const.WEB_CRAWLING} end time {date_time_utils.now_str()}") + return contents + + +def _handle_children_url( + browser_page, + url, + interval_time, + resource_types, + max_depth, + max_count, + exclude_sub_urls, + include_sub_urls, + exclude_img_info +): + """Recursively crawling the children urls, images urls on a website + + Args: + page(Page): + url (str): The URL to crawl. + interval_time (int): The interval time between crawling web pages. + resource_types (list): Crawling the types of resource on a web page. + max_depth (int): The max depth of the recursive loading. + max_count (int): The max web count of the recursive loading. + exclude_sub_urls (list): Excluding subpage urls while crawling a website. + include_sub_urls (list): Including subpage urls while crawling a website. + exclude_img_info (dict): Excluding images with specified attributes (eg: weight、height) while crawling a website. + Returns: + [ + { + "id": "9D4oAQWh", + "pid": "0", + "url": "https://mp.weixin.qq.com/s/65DFY...", + "type": "page", + "level": 1 + } + ] + Examples: + { + "brower_page": + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "interval_time": 1, + "resource_types": [ + "png", + "jpg" + ], + "max_depth": 1, + "max_count": 100, + "exclude_sub_urls": [ + "" + ], + "include_sub_urls": [ + "" + ] + } + Raises: + """ + logger.debug(f"{log_tag_const.WEB_CRAWLING} Loading Root URL: {url}") + contents = [] + count = 1 + # 已爬取网页URL + visited = set() + children_urls = [ + { + "url": url, + "pid": "root" + } + ] + children_urls_bak = [] + for i in range(0, max_depth): + if len(children_urls) == 0: + logger.debug(f'{log_tag_const.WEB_CRAWLING} Crawling completed, no subpages found') + return contents + for children_url in children_urls: + id = ulid.ulid() + if count > max_count: + logger.debug( + f'{log_tag_const.WEB_CRAWLING} Crawling completed, exceeding the maximum limit for the number({max_count}) of web pages to be crawled' + ) + break + content = _handle_content( + browser_page, + children_url["url"], + resource_types, + exclude_sub_urls, + include_sub_urls, + exclude_img_info, + visited, + id, + i+1 + ) + if bool(content): + count = count + 1 + else: + continue + logger.debug(f"{log_tag_const.WEB_CRAWLING} Crawling completed, content {content}" ) + # 添加层级属性 + content["id"] = id + content["pid"] = children_url["pid"] + content["level"] = i + 1 + + if content["children_urls"] is not None: + for url in content['children_urls']: + children_urls_bak.append({ + "pid": id, + "url": url + } + ) + visited.add(children_url["url"]) + + contents.extend(content["images"]) + content.pop("images") + content.pop("children_urls") + contents.append(content) + # 时间间隔 + time.sleep(interval_time) + children_urls = [] + children_urls = children_urls_bak + children_urls_bak = [] + return contents + +def _handle_content( + browser_page, + url, + resource_types, + exclude_sub_urls, + include_sub_urls, + exclude_img_info, + visited, + id, + level +): + """Crawling the children urls, image urls on a URL + + Args: + browser_page(Page): Playwright page + url (str): The URL to crawl. + interval_time (int): The interval time between crawling web pages. + resource_types (list): Crawling the types of resource on a web page. + exclude_sub_urls (list): Excluding subpage urls while crawling a website. + include_sub_urls (list): Including subpage urls while crawling a website. + visited(list): A collection of crawled web pages + id(str): The current page ID of the image + level(int): The current page level of the image + exclude_img_info (dict): Excluding images with specified attributes (eg: weight、height) while crawling a website. + Returns: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ" + "images": [ + { + "url": "https://mmbiz.qpic.cn/sz_mmbiz_png/KmXPKA19gW8nMO7tY8sbhvVXDj9SjZPCibOZMwwiauibXxTlwGr3ic1PEjEiaU69Xa2RMUuWnCvz99ZyDgb7OzEAR9w/640?wx_fmt=png&from=appmsg&wxfrom=5&wx_lazy=1&wx_co=1", + "id": "9D4oAQWh", + "pid": "0", + "type": "image", + "level": 1 + }, + ] + "children_urls": [ + "url": "https://mp.weixin.qq.com/s/9D4oAQ...." + ] + } + Examples: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "interval_time": 1, + "resource_types": [ + "png", + "jpg" + ], + "exclude_sub_urls": [ + "" + ], + "include_sub_urls": [ + "" + ], + + } + Raises: + """ + content = {} + content['url'] = url + content["type"] = "page" + children_urls = [] + images = [] + try: + for include_sub_url in include_sub_urls: + if not fnmatch.fnmatch(urlparse(url).path, include_sub_url): + logger.debug(f"{log_tag_const.WEB_CRAWLING} Not Include Loading Children URL : {url}") + return {} + for exclude_sub_url in exclude_sub_urls: + if fnmatch.fnmatch(urlparse(url).path, exclude_sub_url): + logger.debug(f"{log_tag_const.WEB_CRAWLING} Exclude Loading Children URL : {url}") + return {} + logger.debug(f"{log_tag_const.WEB_CRAWLING} Loading Children URL : {url}") + browser_page.goto(url) + #处理错误响应码 + def when_response(response): + if response.status >= 400 and response.request.url == url: + logger.error( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} Loading url {url} failure, ", + f"status {response.status}, " + f"error message {response.text}" + ] + ) + ) + content['status'] = response.status + content['error_message'] = response.text + browser_page.on('response', when_response) + # 图片 + images = _handle_resource( + browser_page, + url, + resource_types, + exclude_img_info, + id, + level + ) + visited.add(url) + all_links = [urljoin(url, link.get_attribute("href")) for link in browser_page.query_selector_all('//a')] + #child_links = [link for link in set(all_links) if link.startswith(url)] + child_links = [link for link in set(all_links) if link.startswith("http")] + # Remove framents to avoid repititions + defraged_child_links = [urldefrag(link).url for link in set(child_links)] + for link in set(defraged_child_links): + if link not in visited: + visited.add(link) + children_urls.append(link) + logger.debug(f"Children urls: {children_urls}") + except Exception: + content["status"] = 400 + content["error_message"] = traceback.format_exc() + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Excute crawling url failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + content['children_urls'] = children_urls + content["images"] = images + return content + +def handle_content( + url +): + """Crawling the content, and other resources on a URL + Args: + url (str): The URL to crawl. + Returns: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ" + "title": "专为数据库打造:DB-GPT用私有化LLM技术定义数据库下一代交互方式", + "text": "2023 年 6 ...", + "html": " ...", + "description": "专为数据库打造:DB-GPT用私 ...", + "language": "en" + } + Examples: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + } + Raises: + """ + logger.debug(f"{log_tag_const.WEB_CRAWLING} Loading Root URL Detail: {url}") + content = {} + try: + with sync_playwright() as p: + browser = p.chromium.launch() + context = browser.new_context(ignore_https_errors=True) + page = context.new_page() + page.goto(url) + innerHTML = page.content() + soup = BeautifulSoup(innerHTML, "html.parser") + # URL + content['source'] = url + # 标题 + title = soup.find('title').get_text() + content['title'] = title + # 源网页 + content["html"] = innerHTML + # 内容 + text = soup.get_text() + content['text'] = text + # 描述 + description = soup.find("meta", attrs={"name": "description"}) + if description is not None: + content["description"] = description.get("content", "No description found.") + # 语言 + language = soup.find("html").get("lang", "No language found.") + content["language"] = language + logger.debug(f"{log_tag_const.WEB_CRAWLING} Loading content: {content}") + + context.close() + browser.close() + return content + except Exception: + logger.error(''.join([ + f"{log_tag_const.WEB_CRAWLING} Excute crawling url failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ])) + return content + +def _handle_resource( + browser_page, + url, + resource_types, + exclude_img_info, + pid, + level +): + """Crawling images urls on a URL + Args: + brower_page(Page): Playwright page + url (str): The URL to crawl. + resource_types (list): Crawling the types of resource on a web page. + exclude_img_info (dict): Excluding images with specified attributes (eg: weight、height) while crawling a website. + pid(str): The parent page ID of the image + level(int): The parent page level of the image + Returns: + [ + { + "id": "", + "pid": "0", + "url": "https://mp.weixin.qq.com/s/9D4oAQW ...", + "type": "image", + "level": "1" + } + ] + Examples: + { + "page": Playwright Page + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "resource_types": [ + "png", + "jpg" + ], + "pid": "0", + "level": "1" + } + Raises: + """ + resources = [] + if len(resource_types) == 0: + return resources + pic_src = browser_page.query_selector_all('//img') + for pic in pic_src: + image = {} + # 处理懒加载问题 + pic_url = pic.get_attribute('data-src') + if pic_url is None or pic_url == '': + pic_url = pic.get_attribute('src') + logger.debug(f"{log_tag_const.WEB_CRAWLING} Parse Image Url: {pic_url}") + if pic_url is None or pic_url == '': + continue + img_all_url = pic_url if pic_url.startswith('http') else urljoin(url, pic_url) + if not filter_image(img_all_url, resource_types, exclude_img_info): + continue + image["url"] = img_all_url + image["id"] = ulid.ulid() + image["pid"] = pid + image["type"] = "image" + image["level"] = level + resources.append(image) + return resources + + +def filter_image(url, resource_types, exclude_img_info): + """ Filter out ineligible images + Args: + url (str): The Image URL. + resource_types (list): Crawling the types of resource on a web page. + exclude_img_info (dict): Excluding images with specified attributes (eg: weight、height) while crawling a website. + Returns: + Examples: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "resource_types": [ + "png", + "jpg" + ], + "exclude_img_info": { + "weight": 250, + "height": 250 + } + + } + Raises: + """ + try: + # 下载图片数据 + response = requests.get(url) + # 检查响应状态码是否为200,表示请求成功 + if response.status_code == 200: + # 将图片数据转换为BytesIO对象以便于PIL处理 + img_data = BytesIO(response.content) + # 使用PIL打开并识别图片类型 + try: + image = Image.open(img_data) + weight, height = image.size + if weight < exclude_img_info["weight"] and height < exclude_img_info["height"]: + logger.debug( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} Smaller than the size limit for crawled images\n", + f" Original weight: {weight}, Original height: {height}" + ] + ) + ) + return False + # 如果format无法获取,则默认为JPEG格式 + format = image.format or 'JPEG' + if format.lower() not in resource_types: + logger.debug(f"{log_tag_const.WEB_CRAWLING} Not within the range of resource types to be crawled") + return False + return True + except IOError: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Unable to recognize or open the image\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + else: + logger.error(f"Request failed, HTTP status code:{response.status_code}") + return False + except Exception: + logger.error(''.join([ + f"{log_tag_const.WEB_CRAWLING} Excute Request Image Failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ])) + return False + +def download_and_save_image(url, image_id): + """ Save Image to Local + Args: + url (str): Image URL. + image_id (str): Image ID. + Returns: + Examples: + { + "url": "https://mp.weixin.qq.com/s/9D4oAQWh-JjWis2MYEY_gQ", + "img_id": "9D4oAQW..." + } + Raises: + """ + logger.debug(f"Download Image URL : {url}") + try: + # 下载图片数据 + response = requests.get(url) + # 检查响应状态码是否为200,表示请求成功 + if response.status_code == 200: + # 将图片数据转换为BytesIO对象以便于PIL处理 + img_data = BytesIO(response.content) + # 使用PIL打开并识别图片类型 + try: + image = Image.open(img_data) + format = image.format or 'JPEG' # 如果format无法获取,则默认为JPEG格式 + except IOError: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Unable to recognize or open the image\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + return None + output_filename = f"{image_id}.{format.lower()}" + # 保存图片到本地 + try: + image.save(output_filename) + logger.debug(f"{log_tag_const.WEB_CRAWLING} The image has been successfully downloaded and saved as:{output_filename}") + return output_filename + except Exception: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Excute save the image failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + return None + else: + logger.error(f"{log_tag_const.WEB_CRAWLING} Request failed, HTTP status code:{response.status_code}") + return None + except Exception: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Excute Request Image Failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + return None