diff --git a/docs/images/data-process.drawio b/docs/images/data-process.drawio new file mode 100644 index 000000000..df5ea5cb7 --- /dev/null +++ b/docs/images/data-process.drawio @@ -0,0 +1,109 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/images/data-process.drawio.png b/docs/images/data-process.drawio.png new file mode 100644 index 000000000..7f2bb48cf Binary files /dev/null and b/docs/images/data-process.drawio.png differ diff --git a/pypi/data-processing/README.md b/pypi/data-processing/README.md index 8ff1e59ca..258d8f442 100644 --- a/pypi/data-processing/README.md +++ b/pypi/data-processing/README.md @@ -19,7 +19,7 @@ The data processing process includes: cleaning abnormal data, filtering, de-dupl ## Design -![Design](../assets/data_process.drawio.png) +![Design](../../docs/images/data-process.drawio.png) ## Local Development ### Software Requirements diff --git a/pypi/data-processing/requirements.txt b/pypi/data-processing/requirements.txt index 15a97a6f6..20a366d01 100644 --- a/pypi/data-processing/requirements.txt +++ b/pypi/data-processing/requirements.txt @@ -6,7 +6,7 @@ aiohttp==3.8.6 ulid==1.1 minio==7.1.17 zhipuai==1.0.7 -langchain==0.0.336 +langchain==0.0.354 spacy==3.5.4 pypdf==3.17.1 emoji==2.2.0 diff --git a/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py index a8bc57c30..36e77b1d5 100644 --- a/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py +++ b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py @@ -15,6 +15,7 @@ from database_clients import postgresql_pool_client from utils import date_time_utils + def add( req_json, pool diff --git a/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py b/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py index 003009d8e..f665ab19b 100644 --- a/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py +++ b/pypi/data-processing/src/database_operate/dp_document_web_url_db_operate.py @@ -16,6 +16,7 @@ from database_clients import postgresql_pool_client from utils import date_time_utils + def add( req_json, pool diff --git a/pypi/data-processing/src/document_loaders/async_playwright.py b/pypi/data-processing/src/document_loaders/async_playwright.py index 0fd6272da..f7e6ae6ea 100644 --- a/pypi/data-processing/src/document_loaders/async_playwright.py +++ b/pypi/data-processing/src/document_loaders/async_playwright.py @@ -13,13 +13,17 @@ # limitations under the License. import logging +import time +import traceback from typing import List -from langchain_community.document_loaders.base import BaseLoader +import playwright from langchain_community.document_transformers import Html2TextTransformer from langchain_core.documents import Document +from playwright.async_api import async_playwright from common import log_tag_const +from document_loaders.base import BaseLoader logger = logging.getLogger(__name__) @@ -32,7 +36,7 @@ def __init__( url: str, max_count: int = 100, max_depth: int = 1, - interval_time: int = 1, + interval_time: int = 1000, ): """ Initialize the loader with a list of URL paths. @@ -46,18 +50,17 @@ def __init__( Raises: ImportError: If the required 'playwright' package is not installed. """ - self.url = url - self.max_count = max_count - self.max_depth = max_depth - self.interval_time = interval_time - - try: - import playwright - except ImportError: - raise ImportError( - "playwright is required for AsyncPlaywrightLoader. " - "Please install it with `pip install playwright`." - ) + if max_count is None: + max_count = 100 + if max_depth is None: + max_depth = 1 + if interval_time is None: + interval_time = 1000 + + self._url = url + self._max_count = max_count + self._max_depth = max_depth + self._interval_time = interval_time / 1000 async def ascrape_playwright(self, url: str) -> str: """ @@ -70,7 +73,6 @@ async def ascrape_playwright(self, url: str) -> str: str: The scraped HTML content or an error message if an exception occurs. """ - from playwright.async_api import async_playwright logger.info("Starting scraping...") results = "" @@ -121,20 +123,18 @@ async def get_all_url(self): "".join( [ f"{log_tag_const.WEB_CRAWLING} Get all url in a web page\n", - f" url: {self.url}" + f" url: {self._url}" ] ) ) - all_url = [self.url] - sub_urls = [self.url] - + all_url = [self._url] + sub_urls = [self._url] try: - for i in range(1, self.max_depth): + for i in range(1, self._max_depth): for sub_url in sub_urls: children_urls = await self._get_children_url( url=sub_url, - max_count=self.max_count, url_count=len(all_url) ) @@ -147,12 +147,12 @@ async def get_all_url(self): all_url = list(unique_urls) # 如果达到最大数量限制,直接返回 - if res.get("url_count") >= self.max_count: + if res.get("url_count") >= self._max_count: logger.info( "".join( [ f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n", - f" max_count: {self.max_count}\n" + f" max_count: {self._max_count}\n" ] ) ) @@ -160,8 +160,8 @@ async def get_all_url(self): sub_urls = res.get("children_url") # 时间间隔 - logger.info(f"{log_tag_const.WEB_CRAWLING} Wait for {self.interval_time} seconds before continuing the visit.") - time.sleep(self.interval_time) + logger.info(f"{log_tag_const.WEB_CRAWLING} Wait for {self._interval_time} seconds before continuing the visit.") + time.sleep(self._interval_time) return all_url except Exception: logger.error( @@ -188,7 +188,7 @@ async def _get_children_url(self, url, url_count): [ f"{log_tag_const.WEB_CRAWLING} Get sub url in a web page\n", f" url: {url}\n", - f" max_count: {self.max_count}\n", + f" max_count: {self._max_count}\n", f" url_count: {url_count}" ] ) @@ -209,12 +209,12 @@ async def _get_children_url(self, url, url_count): for link in links: href = await link.get_attribute('href') # 需要抓取的url数量不得超过最大数量 - if url_count >= self.max_count: + if url_count >= self._max_count: logger.info( "".join( [ f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n", - f" max_count: {self.max_count}\n", + f" max_count: {self._max_count}\n", f" url_count: {url_count}" ] ) diff --git a/pypi/data-processing/src/document_loaders/base.py b/pypi/data-processing/src/document_loaders/base.py index a31e22eb5..7830efd4a 100644 --- a/pypi/data-processing/src/document_loaders/base.py +++ b/pypi/data-processing/src/document_loaders/base.py @@ -17,6 +17,7 @@ from langchain_core.documents import Document + class BaseLoader(ABC): """Interface for Document Loader.