From ae4e3c2d21951d666bbf0b0ae4c384d18446e41f Mon Sep 17 00:00:00 2001 From: ChunHao <64747455+chuang8511@users.noreply.github.com> Date: Wed, 30 Oct 2024 14:13:27 +0000 Subject: [PATCH] feat(web): refactor the web operator (#772) Because - we want to make scrape page to pages - there are merged code not updated from the review of https://github.com/instill-ai/pipeline-backend/pull/753 This commit - make scrape page to pages - modify the code based on the review note - web operator QA results are in `Test Result` thread in the linear ticket - migration QA result is in `Migration QA` thread in the linear ticket --- .../operator/web/v0/.compogen/bottom.mdx | 38 +-- .../operator/web/v0/.compogen/scrape_page.mdx | 4 +- pkg/component/operator/web/v0/README.mdx | 77 +++--- .../operator/web/v0/config/definition.json | 4 +- .../operator/web/v0/config/tasks.json | 162 ++++++------ .../operator/web/v0/crawl_website.go | 17 +- pkg/component/operator/web/v0/helper.go | 6 +- pkg/component/operator/web/v0/main.go | 16 +- pkg/component/operator/web/v0/main_test.go | 40 ++- .../{scrape_webpage.go => scrape_webpages.go} | 121 ++++++--- .../000033_migrate_web_recipes.down.sql | 0 .../migration/000033_migrate_web_recipes.sql | 0 .../convert/convert000033/convert.go | 238 ++++++++++++++++++ pkg/db/migration/migration.go | 3 + 14 files changed, 542 insertions(+), 184 deletions(-) rename pkg/component/operator/web/v0/{scrape_webpage.go => scrape_webpages.go} (69%) create mode 100644 pkg/db/migration/000033_migrate_web_recipes.down.sql create mode 100644 pkg/db/migration/000033_migrate_web_recipes.sql create mode 100644 pkg/db/migration/convert/convert000033/convert.go diff --git a/pkg/component/operator/web/v0/.compogen/bottom.mdx b/pkg/component/operator/web/v0/.compogen/bottom.mdx index 9c0e36c2f..d80cf87b9 100644 --- a/pkg/component/operator/web/v0/.compogen/bottom.mdx +++ b/pkg/component/operator/web/v0/.compogen/bottom.mdx @@ -1,5 +1,3 @@ - - ## Example Recipes ```yaml @@ -7,38 +5,50 @@ version: v1beta variable: url: - title: url + title: URL instill-format: string component: crawler: type: web input: - root-url: ${variable.url} + url: ${variable.url} allowed-domains: - max-k: 10 - timeout: 0 + max-k: 30 + timeout: 1000 max-depth: 0 condition: task: TASK_CRAWL_SITE + json-filter: + type: json + input: + json-value: ${crawler.output.pages} + jq-filter: .[] | ."link" + condition: + task: TASK_JQ + scraper: type: web input: - url: ${crawler.output.pages[0].link} + urls: ${json-filter.output.results} + scrape-method: http include-html: false only-main-content: true remove-tags: only-include-tags: - timeout: 1000 + timeout: 0 condition: - task: TASK_SCRAPE_PAGE + task: TASK_SCRAPE_PAGES output: - markdown: - title: Markdown - value: ${scraper.output.markdown} + pages: + title: Pages + value: ${crawler.output.pages} links: - title: links - value: ${scraper.output.links-on-page} + title: Links + value: ${json-filter.output.results} + scraper-pages: + title: Scraper Pages + value: ${scraper.output.pages} ``` diff --git a/pkg/component/operator/web/v0/.compogen/scrape_page.mdx b/pkg/component/operator/web/v0/.compogen/scrape_page.mdx index fe37a4834..90d6399a9 100644 --- a/pkg/component/operator/web/v0/.compogen/scrape_page.mdx +++ b/pkg/component/operator/web/v0/.compogen/scrape_page.mdx @@ -26,7 +26,7 @@ #### About Dynamic Content -`TASK_SCRAPE_PAGE` supports fetching dynamic content from web pages by simulating user behaviours, such as scrolling down. The initial implementation includes the following capabilities: +`TASK_SCRAPE_PAGES` supports fetching dynamic content from web pages by simulating user behaviours, such as scrolling down. The initial implementation includes the following capabilities: Scrolling: - Mimics user scrolling down the page to load additional content dynamically. @@ -36,4 +36,4 @@ Future enhancements will include additional user interactions, such as: - Taking Screenshots: Capture screenshots of the current view. - Keyboard Actions: Simulate key presses and other keyboard interactions. -`TASK_SCRAPE_PAGE` aims to provide a robust framework for interacting with web pages and extracting dynamic content effectively. +`TASK_SCRAPE_PAGES` aims to provide a robust framework for interacting with web pages and extracting dynamic content effectively. diff --git a/pkg/component/operator/web/v0/README.mdx b/pkg/component/operator/web/v0/README.mdx index 5dfac1ccd..70c9be519 100644 --- a/pkg/component/operator/web/v0/README.mdx +++ b/pkg/component/operator/web/v0/README.mdx @@ -8,7 +8,7 @@ description: "Learn about how to set up a VDP Web component https://github.com/i The Web component is an operator component that allows users to scrape websites. It can carry out the following tasks: - [Crawl Site](#crawl-site) -- [Scrape Page](#scrape-page) +- [Scrape Pages](#scrape-pages) - [Scrape Sitemap](#scrape-sitemap) @@ -32,7 +32,7 @@ The component definition and tasks are defined in the [definition.json](https:// ### Crawl Site -This task involves systematically navigating through a website, starting from a designated page (typically the homepage), and following internal links to discover and retrieve page titles and URLs. Note that this process only gathers links and titles from multiple pages; it does not extract the content of the pages themselves. If you need to collect specific content from individual pages, please use the [Scrape Page](#scrape-page) task instead. +This task involves systematically navigating through a website, starting from a designated page (typically the homepage), and following internal links to discover and retrieve page titles and URLs. The process is limited to 120 seconds and only collects links and titles from multiple pages; it does not extract the content of the pages themselves. If you need to collect specific content from individual pages, please use the Scrape Page task instead.
@@ -72,16 +72,16 @@ This task involves systematically navigating through a website, starting from a
-### Scrape Page +### Scrape Pages -This task focuses on extracting specific data from a single targeted webpage by parsing its HTML structure. Unlike crawling, which navigates across multiple pages, scraping retrieves content only from the specified page. After scraping, the data can be further processed using a defined [jQuery](https://www.w3schools.com/jquery/jquery_syntax.asp) in a specified sequence. The sequence of jQuery filtering data will be executed in the order of `only-main-content`, `remove-tags`, and `only-include-tags`. Refer to the [jQuery Syntax Examples](#jquery-syntax-examples) for more details on how to filter and manipulate the data. +This task focuses on extracting specific data from targeted webpages by parsing its HTML structure. Unlike crawling, which navigates across multiple pages, scraping retrieves content only from the specified page. After scraping, the data can be further processed using a defined [jQuery](https://www.w3schools.com/jquery/jquery_syntax.asp) in a specified sequence. The sequence of jQuery filtering data will be executed in the order of `only-main-content`, `remove-tags`, and `only-include-tags`. Refer to the [jQuery Syntax Examples](#jquery-syntax-examples) for more details on how to filter and manipulate the data. To avoid a single URL failure from affecting all requests, we will not return an error when an individual URL fails. Instead, we will return all contents that are successfully scraped.
| Input | ID | Type | Description | | :--- | :--- | :--- | :--- | -| Task ID (required) | `task` | string | `TASK_SCRAPE_PAGE` | -| URL (required) | `url` | string | The URL to scrape the webpage contents. | +| Task ID (required) | `task` | string | `TASK_SCRAPE_PAGES` | +| URLs (required) | `urls` | array[string] | The URLs to scrape the webpage contents. | | Scrape Method (required) | `scrape-method` | string | Defines the method used for web scraping. Available options include 'http' for standard HTTP-based scraping and 'chrome-simulator' for scraping through a simulated Chrome browser environment. | | Include HTML | `include-html` | boolean | Indicate whether to include the raw HTML of the webpage in the output. If you want to include the raw HTML, set this to true. | | Only Main Content | `only-main-content` | boolean | Only return the main content of the page by excluding the content of the tag of header, nav, footer. | @@ -99,17 +99,26 @@ This task focuses on extracting specific data from a single targeted webpage by | Output | ID | Type | Description | | :--- | :--- | :--- | :--- | -| Content | `content` | string | The scraped plain content without html tags of the webpage. | -| Markdown | `markdown` | string | The scraped markdown of the webpage. | -| HTML (optional) | `html` | string | The scraped html of the webpage. | -| [Metadata](#scrape-page-metadata) (optional) | `metadata` | object | The metadata of the webpage. | -| Links on Page (optional) | `links-on-page` | array[string] | The list of links on the webpage. | +| [Pages](#scrape-pages-pages) | `pages` | array[object] | A list of page objects that have been scraped. |
- Output Objects in Scrape Page + Output Objects in Scrape Pages + +

Pages

+ +
+ +| Field | Field ID | Type | Note | +| :--- | :--- | :--- | :--- | +| Content | `content` | string | The scraped plain content without html tags of the webpage. | +| HTML | `html` | string | The scraped html of the webpage. | +| Links on Page | `links-on-page` | array | The list of links on the webpage. | +| Markdown | `markdown` | string | The scraped markdown of the webpage. | +| [Metadata](#scrape-pages-metadata) | `metadata` | object | The metadata of the webpage. | +
-

Metadata

+

Metadata

@@ -148,7 +157,7 @@ This task focuses on extracting specific data from a single targeted webpage by #### About Dynamic Content -`TASK_SCRAPE_PAGE` supports fetching dynamic content from web pages by simulating user behaviours, such as scrolling down. The initial implementation includes the following capabilities: +`TASK_SCRAPE_PAGES` supports fetching dynamic content from web pages by simulating user behaviours, such as scrolling down. The initial implementation includes the following capabilities: Scrolling: - Mimics user scrolling down the page to load additional content dynamically. @@ -158,7 +167,7 @@ Future enhancements will include additional user interactions, such as: - Taking Screenshots: Capture screenshots of the current view. - Keyboard Actions: Simulate key presses and other keyboard interactions. -`TASK_SCRAPE_PAGE` aims to provide a robust framework for interacting with web pages and extracting dynamic content effectively. +`TASK_SCRAPE_PAGES` aims to provide a robust framework for interacting with web pages and extracting dynamic content effectively. ### Scrape Sitemap @@ -185,8 +194,6 @@ This task extracts data directly from a website’s sitemap. A sitemap is typica
- - ## Example Recipes ```yaml @@ -194,38 +201,50 @@ version: v1beta variable: url: - title: url + title: URL instill-format: string component: crawler: type: web input: - root-url: ${variable.url} + url: ${variable.url} allowed-domains: - max-k: 10 - timeout: 0 + max-k: 30 + timeout: 1000 max-depth: 0 condition: task: TASK_CRAWL_SITE + json-filter: + type: json + input: + json-value: ${crawler.output.pages} + jq-filter: .[] | ."link" + condition: + task: TASK_JQ + scraper: type: web input: - url: ${crawler.output.pages[0].link} + urls: ${json-filter.output.results} + scrape-method: http include-html: false only-main-content: true remove-tags: only-include-tags: - timeout: 1000 + timeout: 0 condition: - task: TASK_SCRAPE_PAGE + task: TASK_SCRAPE_PAGES output: - markdown: - title: Markdown - value: ${scraper.output.markdown} + pages: + title: Pages + value: ${crawler.output.pages} links: - title: links - value: ${scraper.output.links-on-page} + title: Links + value: ${json-filter.output.results} + scraper-pages: + title: Scraper Pages + value: ${scraper.output.pages} ``` diff --git a/pkg/component/operator/web/v0/config/definition.json b/pkg/component/operator/web/v0/config/definition.json index a9fc31a6e..21cd68b5a 100644 --- a/pkg/component/operator/web/v0/config/definition.json +++ b/pkg/component/operator/web/v0/config/definition.json @@ -1,7 +1,7 @@ { "availableTasks": [ "TASK_CRAWL_SITE", - "TASK_SCRAPE_PAGE", + "TASK_SCRAPE_PAGES", "TASK_SCRAPE_SITEMAP" ], "documentationUrl": "https://www.instill.tech/docs/component/operator/web", @@ -11,7 +11,7 @@ "title": "Web", "type": "COMPONENT_TYPE_OPERATOR", "uid": "98909958-db7d-4dfe-9858-7761904be17e", - "version": "0.3.0", + "version": "0.4.0", "sourceUrl": "https://github.com/instill-ai/pipeline-backend/blob/main/pkg/component/operator/web/v0", "description": "Scrape websites", "releaseStage": "RELEASE_STAGE_ALPHA" diff --git a/pkg/component/operator/web/v0/config/tasks.json b/pkg/component/operator/web/v0/config/tasks.json index e7bb14bf5..035d3618c 100644 --- a/pkg/component/operator/web/v0/config/tasks.json +++ b/pkg/component/operator/web/v0/config/tasks.json @@ -26,7 +26,7 @@ } }, "TASK_CRAWL_SITE": { - "instillShortDescription": "This task involves systematically navigating through a website, starting from a designated page (typically the homepage), and following internal links to discover and retrieve page titles and URLs. Note that this process only gathers links and titles from multiple pages; it does not extract the content of the pages themselves. If you need to collect specific content from individual pages, please use the [Scrape Page](#scrape-page) task instead.", + "instillShortDescription": "This task involves systematically navigating through a website, starting from a designated page (typically the homepage), and following internal links to discover and retrieve page titles and URLs. The process is limited to 120 seconds and only collects links and titles from multiple pages; it does not extract the content of the pages themselves. If you need to collect specific content from individual pages, please use the Scrape Page task instead.", "input": { "instillUIOrder": 0, "properties": { @@ -219,23 +219,26 @@ "type": "object" } }, - "TASK_SCRAPE_PAGE": { - "instillShortDescription": "This task focuses on extracting specific data from a single targeted webpage by parsing its HTML structure. Unlike crawling, which navigates across multiple pages, scraping retrieves content only from the specified page. After scraping, the data can be further processed using a defined [jQuery](https://www.w3schools.com/jquery/jquery_syntax.asp) in a specified sequence. The sequence of jQuery filtering data will be executed in the order of `only-main-content`, `remove-tags`, and `only-include-tags`. Refer to the [jQuery Syntax Examples](#jquery-syntax-examples) for more details on how to filter and manipulate the data.", + "TASK_SCRAPE_PAGES": { + "instillShortDescription": "This task focuses on extracting specific data from targeted webpages by parsing its HTML structure. Unlike crawling, which navigates across multiple pages, scraping retrieves content only from the specified page. After scraping, the data can be further processed using a defined [jQuery](https://www.w3schools.com/jquery/jquery_syntax.asp) in a specified sequence. The sequence of jQuery filtering data will be executed in the order of `only-main-content`, `remove-tags`, and `only-include-tags`. Refer to the [jQuery Syntax Examples](#jquery-syntax-examples) for more details on how to filter and manipulate the data. To avoid a single URL failure from affecting all requests, we will not return an error when an individual URL fails. Instead, we will return all contents that are successfully scraped.", "input": { "instillUIOrder": 0, "properties": { - "url": { - "description": "The URL to scrape the webpage contents.", + "urls": { + "description": "The URLs to scrape the webpage contents.", "instillAcceptFormats": [ - "string" + "array:string" ], + "items": { + "type": "string" + }, "instillUIOrder": 0, "instillUpstreamTypes": [ "value", "reference" ], - "title": "URL", - "type": "string" + "title": "URLs", + "type": "array" }, "scrape-method": { "description": "Defines the method used for web scraping. Available options include 'http' for standard HTTP-based scraping and 'chrome-simulator' for scraping through a simulated Chrome browser environment.", @@ -332,7 +335,7 @@ } }, "required": [ - "url", + "urls", "scrape-method" ], "title": "Input", @@ -341,76 +344,91 @@ "output": { "instillUIOrder": 0, "properties": { - "content": { - "description": "The scraped plain content without html tags of the webpage.", - "instillFormat": "string", + "pages": { + "description": "A list of page objects that have been scraped.", + "instillFormat": "array:object", "instillUIOrder": 0, - "title": "Content", - "type": "string" - }, - "markdown": { - "description": "The scraped markdown of the webpage.", - "instillFormat": "string", - "instillUIOrder": 1, - "title": "Markdown", - "type": "string" - }, - "html": { - "description": "The scraped html of the webpage.", - "instillFormat": "string", - "instillUIMultiline": true, - "instillUIOrder": 2, - "title": "HTML", - "type": "string" - }, - "metadata": { - "description": "The metadata of the webpage.", - "instillFormat": "object", - "instillUIOrder": 3, - "properties": { - "title": { - "description": "The title of the webpage.", - "instillFormat": "string", - "title": "Title", - "instillUIOrder": 0, - "type": "string" - }, - "description": { - "description": "The description of the webpage.", - "instillFormat": "string", - "title": "Description", - "instillUIOrder": 1, - "type": "string" - }, - "source-url": { - "description": "The source URL of the webpage.", - "instillFormat": "string", - "title": "Source URL", - "instillUIOrder": 2, - "type": "string" - } - }, - "required": [ - "title", - "source-url" - ], - "title": "Metadata", - "type": "object" - }, - "links-on-page": { - "description": "The list of links on the webpage.", - "instillUIOrder": 4, - "instillFormat": "array:string", "items": { - "type": "string" + "type": "object", + "properties": { + "content": { + "description": "The scraped plain content without html tags of the webpage.", + "instillFormat": "string", + "instillUIOrder": 0, + "title": "Content", + "type": "string" + }, + "markdown": { + "description": "The scraped markdown of the webpage.", + "instillFormat": "string", + "instillUIOrder": 1, + "title": "Markdown", + "type": "string" + }, + "html": { + "description": "The scraped html of the webpage.", + "instillFormat": "string", + "instillUIMultiline": true, + "instillUIOrder": 2, + "title": "HTML", + "type": "string" + }, + "metadata": { + "description": "The metadata of the webpage.", + "instillFormat": "object", + "instillUIOrder": 3, + "properties": { + "title": { + "description": "The title of the webpage.", + "instillFormat": "string", + "title": "Title", + "instillUIOrder": 0, + "type": "string" + }, + "description": { + "description": "The description of the webpage.", + "instillFormat": "string", + "title": "Description", + "instillUIOrder": 1, + "type": "string" + }, + "source-url": { + "description": "The source URL of the webpage.", + "instillFormat": "string", + "title": "Source URL", + "instillUIOrder": 2, + "type": "string" + } + }, + "required": [ + "title", + "source-url" + ], + "title": "Metadata", + "type": "object" + }, + "links-on-page": { + "description": "The list of links on the webpage.", + "instillUIOrder": 4, + "instillFormat": "array:string", + "items": { + "type": "string" + }, + "title": "Links on Page", + "type": "array" + } + }, + "required": [ + "content", + "markdown" + ] }, - "title": "Links on Page", + "title": "Pages", "type": "array" } }, "required": [ - "content", - "markdown" + "pages" ], "title": "Output", "type": "object" diff --git a/pkg/component/operator/web/v0/crawl_website.go b/pkg/component/operator/web/v0/crawl_website.go index be59ab4d5..806813408 100644 --- a/pkg/component/operator/web/v0/crawl_website.go +++ b/pkg/component/operator/web/v0/crawl_website.go @@ -45,6 +45,10 @@ func (i *CrawlWebsiteInput) preset() { if i.MaxK <= 0 { // When the users set to 0, it means infinite. // However, there is performance issue when we set it to infinite. + // The issue may come from the conflict of goruntine and colly library. + // We have not targeted the specific reason of the issue. + // We set 120 seconds as the timeout in CrawlSite function. + // After testing, we found that we can crawl around 8000 pages in 120 seconds. // So, we set the default value to solve performance issue easily. i.MaxK = 8000 } @@ -85,9 +89,9 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro // Wont be called if error occurs c.OnHTML("a[href]", func(e *colly.HTMLElement) { mu.Lock() + defer mu.Unlock() if ctx.Err() != nil { - mu.Unlock() return } @@ -96,19 +100,16 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro // However, when K is big, the output length could be less than K. // So, I set twice the MaxK to stop the scraping. if len(pageLinks) >= inputStruct.MaxK*getPageTimes(inputStruct.MaxK) { - mu.Unlock() return } link := e.Attr("href") if util.InSlice(pageLinks, link) { - mu.Unlock() return } pageLinks = append(pageLinks, link) - mu.Unlock() _ = e.Request.Visit(link) }) @@ -120,15 +121,13 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro c.OnRequest(func(r *colly.Request) { mu.Lock() + defer mu.Unlock() // Before length of output page is over, we should always send request. if (len(output.Pages) >= inputStruct.MaxK) || ctx.Err() != nil { r.Abort() - mu.Unlock() return } - - mu.Unlock() // Set a random user agent to avoid being blocked by websites r.Headers.Set("User-Agent", randomString()) }) @@ -157,20 +156,18 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro page.Title = title mu.Lock() + defer mu.Unlock() // If we do not set this condition, the length of output.Pages could be over the limit. if len(output.Pages) < inputStruct.MaxK { output.Pages = append(output.Pages, page) // If the length of output.Pages is equal to MaxK, we should stop the scraping. if len(output.Pages) == inputStruct.MaxK { - mu.Unlock() cancel() return } - mu.Unlock() return } - mu.Unlock() cancel() }) diff --git a/pkg/component/operator/web/v0/helper.go b/pkg/component/operator/web/v0/helper.go index e02472999..4171e7125 100644 --- a/pkg/component/operator/web/v0/helper.go +++ b/pkg/component/operator/web/v0/helper.go @@ -16,15 +16,15 @@ type scrapeInput interface { onlyIncludeTags() []string } -func (i ScrapeWebpageInput) onlyMainContent() bool { +func (i ScrapeWebpagesInput) onlyMainContent() bool { return i.OnlyMainContent } -func (i ScrapeWebpageInput) removeTags() []string { +func (i ScrapeWebpagesInput) removeTags() []string { return i.RemoveTags } -func (i ScrapeWebpageInput) onlyIncludeTags() []string { +func (i ScrapeWebpagesInput) onlyIncludeTags() []string { return i.OnlyIncludeTags } diff --git a/pkg/component/operator/web/v0/main.go b/pkg/component/operator/web/v0/main.go index a72837d5e..a71e1f5e6 100644 --- a/pkg/component/operator/web/v0/main.go +++ b/pkg/component/operator/web/v0/main.go @@ -1,4 +1,4 @@ -//go:generate compogen readme ./config ./README.mdx --extraContents TASK_SCRAPE_PAGE=.compogen/scrape_page.mdx --extraContents bottom=.compogen/bottom.mdx +//go:generate compogen readme ./config ./README.mdx --extraContents TASK_SCRAPE_PAGES=.compogen/scrape_page.mdx --extraContents bottom=.compogen/bottom.mdx package web import ( @@ -17,7 +17,7 @@ import ( const ( taskCrawlSite = "TASK_CRAWL_SITE" - taskScrapePage = "TASK_SCRAPE_PAGE" + taskScrapePages = "TASK_SCRAPE_PAGES" taskScrapeSitemap = "TASK_SCRAPE_SITEMAP" ) @@ -37,9 +37,9 @@ type component struct { type execution struct { base.ComponentExecution - execute func(*structpb.Struct) (*structpb.Struct, error) - externalCaller func(url string) (ioCloser io.ReadCloser, err error) - getDocAfterRequestURL func(url string, timeout int, scrapeMethod string) (*goquery.Document, error) + execute func(*structpb.Struct) (*structpb.Struct, error) + externalCaller func(url string) (ioCloser io.ReadCloser, err error) + getDocsAfterRequestURLs func(urls []string, timeout int, scrapeMethod string) ([]*goquery.Document, error) } func Init(bc base.Component) *component { @@ -65,9 +65,9 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, // To make mocking easier e.externalCaller = scrapSitemapCaller e.execute = e.ScrapeSitemap - case taskScrapePage: - e.getDocAfterRequestURL = getDocAfterRequestURL - e.execute = e.ScrapeWebpage + case taskScrapePages: + e.getDocsAfterRequestURLs = getDocAfterRequestURL + e.execute = e.ScrapeWebpages default: return nil, fmt.Errorf("%s task is not supported", x.Task) } diff --git a/pkg/component/operator/web/v0/main_test.go b/pkg/component/operator/web/v0/main_test.go index 4024a2b81..b3eed7f77 100644 --- a/pkg/component/operator/web/v0/main_test.go +++ b/pkg/component/operator/web/v0/main_test.go @@ -2,6 +2,7 @@ package web import ( "io" + "net/url" "strings" "testing" @@ -63,20 +64,20 @@ func fakeScrapeSitemapCaller(url string) (ioCloser io.ReadCloser, err error) { return io.NopCloser(strings.NewReader(xml)), nil } -func TestScrapeWebpage(t *testing.T) { +func TestScrapeWebpages(t *testing.T) { c := quicktest.New(t) c.Run("ScrapeWebpage", func(c *quicktest.C) { component := Init(base.Component{}) e := &execution{ - ComponentExecution: base.ComponentExecution{Component: component, SystemVariables: nil, Setup: nil, Task: taskScrapePage}, - getDocAfterRequestURL: fakeHTTPRequest, + ComponentExecution: base.ComponentExecution{Component: component, SystemVariables: nil, Setup: nil, Task: taskScrapePages}, + getDocsAfterRequestURLs: fakeHTTPRequests, } - e.execute = e.ScrapeWebpage + e.execute = e.ScrapeWebpages - input := &ScrapeWebpageInput{ - URL: "https://www.example.com", + input := &ScrapeWebpagesInput{ + URLs: []string{"https://www.example.com"}, } inputStruct, err := base.ConvertToStructpb(input) @@ -85,18 +86,18 @@ func TestScrapeWebpage(t *testing.T) { output, err := e.execute(inputStruct) c.Assert(err, quicktest.IsNil) - var outputStruct ScrapeWebpageOutput + var outputStruct ScrapeWebpagesOutput err = base.ConvertFromStructpb(output, &outputStruct) c.Assert(err, quicktest.IsNil) - c.Assert(outputStruct.Metadata.Title, quicktest.Equals, "Test") - c.Assert(outputStruct.Metadata.Description, quicktest.Equals, "") - c.Assert(outputStruct.Metadata.SourceURL, quicktest.Equals, "https://www.example.com") + c.Assert(outputStruct.Pages[0].Metadata.Title, quicktest.Equals, "Test") + c.Assert(outputStruct.Pages[0].Metadata.Description, quicktest.Equals, "") + c.Assert(outputStruct.Pages[0].Metadata.SourceURL, quicktest.Equals, "https://www.example.com") }) } -func fakeHTTPRequest(url string, timeout int, scrapeMethod string) (*goquery.Document, error) { +func fakeHTTPRequests(urls []string, timeout int, scrapeMethod string) ([]*goquery.Document, error) { html := ` @@ -110,5 +111,20 @@ func fakeHTTPRequest(url string, timeout int, scrapeMethod string) (*goquery.Doc ` - return goquery.NewDocumentFromReader(strings.NewReader(html)) + output := []*goquery.Document{} + + doc, err := goquery.NewDocumentFromReader(strings.NewReader(html)) + + if err != nil { + return nil, err + } + doc.Url, err = url.Parse("https://www.example.com") + + if err != nil { + return nil, err + } + + output = append(output, doc) + + return output, nil } diff --git a/pkg/component/operator/web/v0/scrape_webpage.go b/pkg/component/operator/web/v0/scrape_webpages.go similarity index 69% rename from pkg/component/operator/web/v0/scrape_webpage.go rename to pkg/component/operator/web/v0/scrape_webpages.go index 940d52846..de42b4315 100644 --- a/pkg/component/operator/web/v0/scrape_webpage.go +++ b/pkg/component/operator/web/v0/scrape_webpages.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strings" + "sync" "time" "github.com/PuerkitoBio/goquery" @@ -17,10 +18,10 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/internal/util" ) -// ScrapeWebpageInput defines the input of the scrape webpage task -type ScrapeWebpageInput struct { - // URL: The URL of the webpage to scrape. - URL string `json:"url"` +// ScrapeWebpagesInput defines the input of the scrape webpage task +type ScrapeWebpagesInput struct { + // URLs: The URLs of the webpage to scrape. + URLs []string `json:"urls"` // ScrapeMethod: The method to scrape the webpage. It can be "http" or "chromedp". ScrapeMethod string `json:"scrape-method"` // IncludeHTML: Whether to include the HTML content of the webpage. @@ -35,17 +36,21 @@ type ScrapeWebpageInput struct { Timeout int `json:"timeout,omitempty"` } +// ScrapeWebpagesOutput defines the output of the scrape webpage task +type ScrapeWebpagesOutput struct { + Pages []ScrapedPage `json:"pages"` +} -// ScrapeWebpageOutput defines the output of the scrape webpage task -type ScrapeWebpageOutput struct { +// ScrapedPage defines the struct of a webpage. +type ScrapedPage struct { // Content: The plain text content of the webpage. - Content string `json:"content"` + Content string `json:"content"` // Markdown: The markdown content of the webpage. - Markdown string `json:"markdown"` + Markdown string `json:"markdown"` // HTML: The HTML content of the webpage. - HTML string `json:"html"` + HTML string `json:"html"` // Metadata: The metadata of the webpage. - Metadata Metadata `json:"metadata"` + Metadata Metadata `json:"metadata"` // LinksOnPage: The list of links on the webpage. LinksOnPage []string `json:"links-on-page"` } @@ -53,17 +58,17 @@ type ScrapeWebpageOutput struct { // Metadata defines the metadata of the webpage type Metadata struct { // Title: The title of the webpage. - Title string `json:"title"` + Title string `json:"title"` // Description: The description of the webpage. Description string `json:"description,omitempty"` // SourceURL: The source URL of the webpage. - SourceURL string `json:"source-url"` + SourceURL string `json:"source-url"` } -// ScrapeWebpage scrapes the content of a webpage -func (e *execution) ScrapeWebpage(input *structpb.Struct) (*structpb.Struct, error) { +// ScrapeWebpages scrapes the objects of webpages +func (e *execution) ScrapeWebpages(input *structpb.Struct) (*structpb.Struct, error) { - inputStruct := ScrapeWebpageInput{} + inputStruct := ScrapeWebpagesInput{} err := base.ConvertFromStructpb(input, &inputStruct) @@ -71,17 +76,20 @@ func (e *execution) ScrapeWebpage(input *structpb.Struct) (*structpb.Struct, err return nil, fmt.Errorf("error converting input to struct: %v", err) } - output := ScrapeWebpageOutput{} + output := ScrapeWebpagesOutput{} - doc, err := e.getDocAfterRequestURL(inputStruct.URL, inputStruct.Timeout, inputStruct.ScrapeMethod) + docs, err := e.getDocsAfterRequestURLs(inputStruct.URLs, inputStruct.Timeout, inputStruct.ScrapeMethod) if err != nil { return nil, fmt.Errorf("error getting HTML page doc: %v", err) } - html := getRemovedTagsHTML(doc, inputStruct) + for _, doc := range docs { + html := getRemovedTagsHTML(doc, inputStruct) + + err = setOutput(&output, inputStruct, doc, html) - err = setOutput(&output, inputStruct, doc, html) + } if err != nil { return nil, fmt.Errorf("error setting output: %v", err) @@ -91,12 +99,55 @@ func (e *execution) ScrapeWebpage(input *structpb.Struct) (*structpb.Struct, err } -func getDocAfterRequestURL(url string, timeout int, scrapeMethod string) (*goquery.Document, error) { +func getDocAfterRequestURL(urls []string, timeout int, scrapeMethod string) ([]*goquery.Document, error) { + var wg sync.WaitGroup + docCh := make(chan *goquery.Document, len(urls)) + errCh := make(chan error, len(urls)) + + for _, url := range urls { + wg.Add(1) + go func(url string) { + defer wg.Done() + var doc *goquery.Document + var err error + if scrapeMethod == "http" { + doc, err = httpRequest(url) + } else { + doc, err = requestToWebpage(url, timeout) + } + if err != nil { + errCh <- err + return + } + doc.Url, err = doc.Url.Parse(url) + if err != nil { + errCh <- err + return + } + + docCh <- doc + }(url) + } + + go func() { + wg.Wait() + close(docCh) + close(errCh) + }() + + docs := []*goquery.Document{} + for doc := range docCh { + docs = append(docs, doc) + } - if scrapeMethod == "http" { - return httpRequest(url) + // When pipeline combines crawler and scraper, there may be some urls not able to get the doc. + // To avoid the errors, we should return the docs that are able to be fetched and ignore the failed ones. + if len(errCh) > 0 { + log.Printf("get HTML page doc: %v", <-errCh) } - return requestToWebpage(url, timeout) + + return docs, nil + } func httpRequest(url string) (*goquery.Document, error) { @@ -190,21 +241,25 @@ func buildTags(tags []string) string { return tagsString } -func setOutput(output *ScrapeWebpageOutput, input ScrapeWebpageInput, doc *goquery.Document, html string) error { +func setOutput(output *ScrapeWebpagesOutput, input ScrapeWebpagesInput, doc *goquery.Document, html string) error { plain := html2text.HTML2Text(html) - output.Content = plain + scrapedPage := ScrapedPage{} + + scrapedPage.Content = plain if input.IncludeHTML { - output.HTML = html + scrapedPage.HTML = html } - markdown, err := getMarkdown(html, input.URL) + url := doc.Url.String() + + markdown, err := getMarkdown(html, url) if err != nil { return fmt.Errorf("failed to get markdown: %v", err) } - output.Markdown = markdown + scrapedPage.Markdown = markdown title := util.ScrapeWebpageTitle(doc) description := util.ScrapeWebpageDescription(doc) @@ -212,17 +267,19 @@ func setOutput(output *ScrapeWebpageOutput, input ScrapeWebpageInput, doc *goque metadata := Metadata{ Title: title, Description: description, - SourceURL: input.URL, + SourceURL: url, } - output.Metadata = metadata + scrapedPage.Metadata = metadata - links, err := getAllLinksOnPage(doc, input.URL) + links, err := getAllLinksOnPage(doc, url) if err != nil { return fmt.Errorf("failed to get links on page: %v", err) } - output.LinksOnPage = links + scrapedPage.LinksOnPage = links + + output.Pages = append(output.Pages, scrapedPage) return nil diff --git a/pkg/db/migration/000033_migrate_web_recipes.down.sql b/pkg/db/migration/000033_migrate_web_recipes.down.sql new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/db/migration/000033_migrate_web_recipes.sql b/pkg/db/migration/000033_migrate_web_recipes.sql new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/db/migration/convert/convert000033/convert.go b/pkg/db/migration/convert/convert000033/convert.go new file mode 100644 index 000000000..1c0827f9a --- /dev/null +++ b/pkg/db/migration/convert/convert000033/convert.go @@ -0,0 +1,238 @@ +package convert000033 + +import ( + "fmt" + "strings" + + "go.uber.org/zap" + "gopkg.in/yaml.v3" + "gorm.io/gorm" + + "github.com/instill-ai/pipeline-backend/pkg/datamodel" + "github.com/instill-ai/pipeline-backend/pkg/db/migration/convert" +) + +const batchSize = 100 + +type ConvertWebFields struct { + convert.Basic +} + +func (c *ConvertWebFields) Migrate() error { + if err := c.migratePipeline(); err != nil { + return err + } + return c.migratePipelineRelease() +} + +func (c *ConvertWebFields) getComponentTypeQuery(compType string) *gorm.DB { + pattern := fmt.Sprintf(`type:\s+%s`, compType) + return c.DB.Select("uid", "recipe_yaml", "recipe"). + Where("recipe_yaml ~ ?", pattern). + Where("delete_time IS NULL") +} + +func (c *ConvertWebFields) migratePipeline() error { + q := c.getComponentTypeQuery("web") + + pipelines := make([]*datamodel.Pipeline, 0, batchSize) + return q.FindInBatches(&pipelines, batchSize, func(tx *gorm.DB, _ int) error { + for _, p := range pipelines { + isRecipeUpdated := false + l := c.Logger.With(zap.String("pipelineUID", p.UID.String())) + + if p.Recipe == nil { + continue + } + + var webComponentNames []string + + for id, comp := range p.Recipe.Component { + isComponentUpdated, err := c.updateWebInput(id, comp, &webComponentNames) + if err != nil { + l.With(zap.String("componentID", id), zap.Error(err)). + Error("Failed to update pipeline.") + + return fmt.Errorf("updating pipeline component: %w", err) + } + + isRecipeUpdated = isComponentUpdated || isRecipeUpdated + } + + if !isRecipeUpdated { + continue + } + + recipeYAML, err := yaml.Marshal(p.Recipe) + if err != nil { + return fmt.Errorf("marshalling recipe: %w", err) + } + + if len(webComponentNames) == 0 { + result := tx.Model(p).Where("uid = ?", p.UID).Update("recipe_yaml", string(recipeYAML)) + if result.Error != nil { + l.Error("Failed to update pipeline release.") + return fmt.Errorf("updating pipeline recipe: %w", result.Error) + } + continue + } + + updatedRecipe, err := c.updateWebOutputReceiver(recipeYAML, webComponentNames) + + if err != nil { + return fmt.Errorf("updating pipeline output receiver: %w", err) + } + + result := tx.Model(p).Where("uid = ?", p.UID).Update("recipe_yaml", updatedRecipe) + if result.Error != nil { + l.Error("Failed to update pipeline release.") + return fmt.Errorf("updating pipeline recipe: %w", result.Error) + } + + } + + return nil + }).Error +} + +func (c *ConvertWebFields) migratePipelineRelease() error { + q := c.getComponentTypeQuery("web") + + pipelines := make([]*datamodel.Pipeline, 0, batchSize) + return q.FindInBatches(&pipelines, batchSize, func(tx *gorm.DB, _ int) error { + for _, p := range pipelines { + isRecipeUpdated := false + l := c.Logger.With(zap.String("pipelineReleaseUID", p.UID.String())) + + if p.Recipe == nil { + continue + } + + var webComponentNames []string + + for id, comp := range p.Recipe.Component { + isComponentUpdated, err := c.updateWebInput(id, comp, &webComponentNames) + if err != nil { + l.With(zap.String("componentID", id), zap.Error(err)). + Error("Failed to update pipeline.") + + return fmt.Errorf("updating pipeline component: %w", err) + } + isRecipeUpdated = isComponentUpdated || isRecipeUpdated + } + + if !isRecipeUpdated { + continue + } + + recipeYAML, err := yaml.Marshal(p.Recipe) + if err != nil { + return fmt.Errorf("marshalling recipe: %w", err) + } + + if len(webComponentNames) == 0 { + result := tx.Model(p).Where("uid = ?", p.UID).Update("recipe_yaml", string(recipeYAML)) + if result.Error != nil { + l.Error("Failed to update pipeline release.") + return fmt.Errorf("updating pipeline recipe: %w", result.Error) + } + continue + } + + updatedRecipe, err := c.updateWebOutputReceiver(recipeYAML, webComponentNames) + + if err != nil { + return fmt.Errorf("updating pipeline output receiver: %w", err) + } + + result := tx.Model(p).Where("uid = ?", p.UID).Update("recipe_yaml", updatedRecipe) + if result.Error != nil { + l.Error("Failed to update pipeline release.") + return fmt.Errorf("updating pipeline recipe: %w", result.Error) + } + + } + + return nil + }).Error +} + +func (c *ConvertWebFields) updateWebInput(compName string, comp *datamodel.Component, webComponentNames *[]string) (bool, error) { + + if comp.Type == "iterator" { + isComponentUpdated := false + for compNameInIterator, comp := range comp.Component { + isUpdated, err := c.updateWebInput(compNameInIterator, comp, webComponentNames) + if err != nil { + return false, err + } + isComponentUpdated = isUpdated || isComponentUpdated + } + return isComponentUpdated, nil + } + + if comp.Type != "web" { + return false, nil + } + + if comp.Task != "TASK_SCRAPE_PAGE" { + return false, nil + } + + comp.Task = "TASK_SCRAPE_PAGES" + + input, isMap := comp.Input.(map[string]interface{}) + + if !isMap { + return false, nil + } + + if v, ok := input["url"]; ok { + input["urls"] = []string{v.(string)} + delete(input, "url") + } + + timeout, timeoutFound := input["timeout"] + + timeoutType := fmt.Sprintf("%T", timeout) + + if timeoutFound && timeout != nil && timeoutType == "int" && timeout.(int) > 0 { + input["scrape-method"] = "chrome-simulator" + } + + _, scrapeMethodFound := input["scrape-method"] + + if !scrapeMethodFound { + input["scrape-method"] = "http" + } + + *webComponentNames = append(*webComponentNames, compName) + + return true, nil +} + +func (c *ConvertWebFields) updateWebOutputReceiver(recipeYAML []byte, webComponentNames []string) (string, error) { + originalFields := []string{ + "content", + "markdown", + "html", + "metadata", + "links-on-page", + } + recipeString := string(recipeYAML) + + updatedRecipe := recipeString + for _, compName := range webComponentNames { + for _, field := range originalFields { + // Don't have to add `}` because it could be `links-on-page[0]` + // It will be wrong if we add `}` at the end + originalWebOutput := fmt.Sprintf("${%s.output.%s", compName, field) + if !strings.Contains(recipeString, originalWebOutput) { + continue + } + updatedRecipe = strings.ReplaceAll(updatedRecipe, originalWebOutput, fmt.Sprintf("${%s.output.pages[0].%s", compName, field)) + } + } + + return updatedRecipe, nil +} diff --git a/pkg/db/migration/migration.go b/pkg/db/migration/migration.go index 8e36c28f7..f5ef42633 100644 --- a/pkg/db/migration/migration.go +++ b/pkg/db/migration/migration.go @@ -15,6 +15,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/db/migration/convert/convert000029" "github.com/instill-ai/pipeline-backend/pkg/db/migration/convert/convert000031" "github.com/instill-ai/pipeline-backend/pkg/db/migration/convert/convert000032" + "github.com/instill-ai/pipeline-backend/pkg/db/migration/convert/convert000033" "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/logger" @@ -76,6 +77,8 @@ func Migrate(version uint) error { m = &convert000031.SlackSetupConverter{Basic: bc} case 32: m = &convert000032.ConvertToWeb{Basic: bc} + case 33: + m = &convert000033.ConvertWebFields{Basic: bc} default: return nil }