Skip to content

Integrate Crawl4Ai to scrape web content #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:3.11
FROM --platform=linux/amd64 python:3.11

# Required for opencv
RUN apt-get update -y && \
@@ -14,7 +14,8 @@ COPY backend/vectordb.requirements.txt /tmp/vectordb.requirements.txt

# Install Python packages
RUN python3 -m pip install -U pip setuptools wheel uv && \
python3 -m uv pip install --no-cache-dir -r /tmp/requirements.txt --index-strategy unsafe-any-match
python3 -m uv pip install --no-cache-dir -r /tmp/requirements.txt --index-strategy unsafe-any-match && \
playwright install --with-deps

# Install VectorDB packages
ARG ADD_VECTORDB=0
2 changes: 1 addition & 1 deletion backend/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -163,7 +163,7 @@ async def _sync_data_source_to_collection(
data_ingestion_mode=inputs.data_ingestion_mode,
)

for loaded_data_points_batch in loaded_data_points_batch_iterator:
async for loaded_data_points_batch in loaded_data_points_batch_iterator:
try:
await ingest_data_points(
inputs=inputs,
14 changes: 7 additions & 7 deletions backend/modules/dataloaders/loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict, Iterator, List
from typing import AsyncGenerator, Dict, List

from backend.types import DataIngestionMode, DataSource, LoadedDataPoint

@@ -36,7 +36,7 @@ class BaseDataLoader(ABC):
Base data loader class. Data loader is responsible for detecting, filtering and then loading data points to be ingested.
"""

def load_full_data(
async def load_full_data(
self,
data_source: DataSource,
dest_dir: str,
@@ -51,15 +51,15 @@ def load_full_data(
Returns:
None
"""
return self.load_filtered_data(
return await self.load_filtered_data(
data_source,
dest_dir,
previous_snapshot={},
batch_size=batch_size,
data_ingestion_mode=DataIngestionMode.FULL,
)

def load_incremental_data(
async def load_incremental_data(
self,
data_source: DataSource,
dest_dir: str,
@@ -76,7 +76,7 @@ def load_incremental_data(
Returns:
None
"""
return self.load_filtered_data(
return await self.load_filtered_data(
data_source,
dest_dir,
previous_snapshot,
@@ -85,14 +85,14 @@ def load_incremental_data(
)

@abstractmethod
def load_filtered_data(
async def load_filtered_data(
self,
data_source: DataSource,
dest_dir: str,
previous_snapshot: Dict[str, str],
batch_size: int,
data_ingestion_mode: DataIngestionMode,
) -> Iterator[List[LoadedDataPoint]]:
) -> AsyncGenerator[List[LoadedDataPoint], None]:
"""
Sync the data source, filter data points and load them from the source to the destination directory.
This method returns the loaded data points in batches as an iterator.
6 changes: 3 additions & 3 deletions backend/modules/dataloaders/local_dir_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import shutil
from typing import Dict, Iterator, List
from typing import AsyncGenerator, Dict, List

from backend.logger import logger
from backend.modules.dataloaders.loader import BaseDataLoader
@@ -12,14 +12,14 @@ class LocalDirLoader(BaseDataLoader):
Load data from a local directory
"""

def load_filtered_data(
async def load_filtered_data(
self,
data_source: DataSource,
dest_dir: str,
previous_snapshot: Dict[str, str],
batch_size: int,
data_ingestion_mode: DataIngestionMode,
) -> Iterator[List[LoadedDataPoint]]:
) -> AsyncGenerator[List[LoadedDataPoint], None]:
"""
Loads data from a local directory specified by the given source URI.
"""
8 changes: 4 additions & 4 deletions backend/modules/dataloaders/truefoundry_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Dict, Iterator, List
from typing import AsyncGenerator, Dict, List

from truefoundry.ml import get_client as get_tfy_client

@@ -14,14 +14,14 @@ class TrueFoundryLoader(BaseDataLoader):
Load data from a TrueFoundry data source (data-dir).
"""

def load_filtered_data(
async def load_filtered_data(
self,
data_source: DataSource,
dest_dir: str,
previous_snapshot: Dict[str, str],
batch_size: int,
data_ingestion_mode: DataIngestionMode,
) -> Iterator[List[LoadedDataPoint]]:
) -> AsyncGenerator[List[LoadedDataPoint], None]:
"""
Loads data from a truefoundry data directory with FQN specified by the given source URI.
"""
@@ -48,7 +48,7 @@ def load_filtered_data(
# If tfy_files_dir is None, it means the data was not downloaded.
if tfy_files_dir is None:
logger.error("Download info not found")
return iter([])
raise ValueError("Failed to download data directory")

if os.path.exists(os.path.join(tfy_files_dir, "files")):
logger.debug("Files directory exists")
148 changes: 101 additions & 47 deletions backend/modules/dataloaders/web_loader.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# Author: https://github.com/paulpierre/markdown-crawler/
# Description: A multithreaded web crawler that recursively crawls a website and creates a markdown file for each page.
import mimetypes
import os
import tempfile
from typing import Dict, Iterator, List
from datetime import date
from typing import AsyncGenerator, Dict, List, Tuple

from markdown_crawler import md_crawl
import aiohttp
from bs4 import BeautifulSoup

from backend.logger import logger
from backend.modules.dataloaders.loader import BaseDataLoader
from backend.types import DataIngestionMode, DataPoint, DataSource, LoadedDataPoint
from backend.types import DataIngestionMode, DataSource, LoadedDataPoint

DEFAULT_BASE_DIR = os.path.join(
tempfile.gettempdir(), "webloader"
@@ -21,73 +24,124 @@
DEFAULT_BASE_PATH_MATCH = True


async def fetch_sitemap(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()


async def extract_urls_from_sitemap(url: str) -> List[Tuple[str, str]]:
sitemap_url = f"{url.rstrip('/')}/sitemap.xml"
sitemap_content = await fetch_sitemap(sitemap_url)
if not sitemap_content:
logger.debug(f"No sitemap found for {url}")
return [(url, None)]
logger.debug(f"Found sitemap for {url} at {sitemap_url}")
soup = BeautifulSoup(sitemap_content, "xml")
urls = [
(
loc.text,
loc.find_next_sibling("lastmod").text
if loc.find_next_sibling("lastmod")
else None,
)
for loc in soup.find_all("loc")
]
return urls


class WebLoader(BaseDataLoader):
"""
Load data from a web URL
"""

def load_filtered_data(
async def load_filtered_data(
self,
data_source: DataSource,
dest_dir: str,
previous_snapshot: Dict[str, str],
batch_size: int,
data_ingestion_mode: DataIngestionMode,
) -> Iterator[List[LoadedDataPoint]]:
) -> AsyncGenerator[List[LoadedDataPoint], None]:
"""
Loads data from a web URL and converts it to Markdown format.
"""

md_crawl(
base_url=data_source.uri,
max_depth=DEFAULT_MAX_DEPTH,
num_threads=DEFAULT_NUM_THREADS,
base_dir=DEFAULT_BASE_DIR,
target_links=DEFAULT_TARGET_LINKS,
target_content=DEFAULT_TARGET_CONTENT,
is_domain_match=DEFAULT_DOMAIN_MATCH,
is_base_path_match=DEFAULT_BASE_PATH_MATCH,
)
logger.debug(
f"WebLoader: Crawled {data_source.uri} and saved to {DEFAULT_BASE_DIR}"
)
if not data_source.uri.startswith(("http://", "https://")):
raise ValueError(
f"Invalid URL: {data_source.uri}. URL must start with http:// or https://"
)

urls = [(data_source.uri, None)]

if data_source.metadata.get("use_sitemap", False):
urls = await extract_urls_from_sitemap(data_source.uri)
logger.debug(f"Found a total of {len(urls)} URLs.")

loaded_data_points: List[LoadedDataPoint] = []
dest_dir = DEFAULT_BASE_DIR
for root, d_names, f_names in os.walk(dest_dir):
for f in f_names:
if f.startswith("."):
continue
full_path = os.path.join(root, f)
rel_path = os.path.relpath(full_path, dest_dir)
file_ext = os.path.splitext(f)[1]
data_point = DataPoint(
data_source_fqn=data_source.fqn,
data_point_uri=rel_path,
data_point_hash=str(os.path.getsize(full_path)),
local_filepath=full_path,
file_extension=file_ext,
)

# If the data ingestion mode is incremental, check if the data point already exists.
if (
data_ingestion_mode == DataIngestionMode.INCREMENTAL
and previous_snapshot.get(data_point.data_point_fqn)
and previous_snapshot.get(data_point.data_point_fqn)
== data_point.data_point_hash
):
async with aiohttp.ClientSession() as session:
for url, lastmod in urls:
content_hash = lastmod
# If last modified date is not available, fetch it from the web url
if not content_hash:
logger.debug(f"Cannot find last modified date for {url}.")
async with session.head(url) as response:
if response.status != 200:
logger.warning(
f"Failed to fetch {url}: Status {response.status}"
)
continue

# Use ETag or Last-Modified header as the content hash
content_hash = (
response.headers.get("ETag", None)
or response.headers.get("Last-Modified", None)
or date.today().isoformat()
)
logger.debug(
f"Last modified date for {url}: {response.headers.get('Last-Modified', 'today')}"
)

else:
logger.debug(f"Last modified date for {url}: {content_hash}")

if previous_snapshot.get(url) == content_hash:
logger.debug(f"No changes detected for {url}")
continue

extension = "url"
local_filepath = url
if mime := mimetypes.guess_type(url)[0]:
extension = mimetypes.guess_extension(mime) or "url"

if extension != "url":
async with session.get(url) as response:
if response.status != 200:
logger.warning(
f"Failed to fetch {url}: Status {response.status}"
)
continue
# Could have used path as per URL but that makes us vulnerable to path traversal attacks
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension, dir=dest_dir, mode="wb"
) as temp_file:
temp_file.write(await response.read())
temp_file.write
local_filepath = temp_file.name

loaded_data_points.append(
LoadedDataPoint(
data_point_hash=data_point.data_point_hash,
data_point_uri=data_point.data_point_uri,
data_source_fqn=data_point.data_source_fqn,
local_filepath=full_path,
file_extension=file_ext,
data_point_hash=content_hash,
data_point_uri=url,
data_source_fqn=f"web::{url}",
local_filepath=local_filepath,
file_extension=extension,
)
)

if len(loaded_data_points) >= batch_size:
yield loaded_data_points
loaded_data_points.clear()
yield loaded_data_points
loaded_data_points = []

yield loaded_data_points
2 changes: 2 additions & 0 deletions backend/modules/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -3,9 +3,11 @@
from backend.modules.parsers.parser import register_parser
from backend.modules.parsers.unstructured_io import UnstructuredIoParser
from backend.modules.parsers.video_parser import VideoParser
from backend.modules.parsers.web_parser import WebParser

# The order of registry defines the order of precedence
register_parser("UnstructuredIoParser", UnstructuredIoParser)
register_parser("MultiModalParser", MultiModalParser)
register_parser("AudioParser", AudioParser)
register_parser("VideoParser", VideoParser)
register_parser("WebParser", WebParser)
1 change: 0 additions & 1 deletion backend/modules/parsers/unstructured_io.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ class UnstructuredIoParser(BaseParser):
".html",
".md",
".rst",
".json",
".rtf",
".jpeg",
".png",
Loading