-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(web): refactor web operator #753
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,8 +1,8 @@ | ||||||||
package web | ||||||||
|
||||||||
import ( | ||||||||
"context" | ||||||||
"fmt" | ||||||||
"log" | ||||||||
"net/url" | ||||||||
"strings" | ||||||||
"sync" | ||||||||
|
@@ -19,8 +19,11 @@ import ( | |||||||
"github.com/instill-ai/pipeline-backend/pkg/component/internal/util" | ||||||||
) | ||||||||
|
||||||||
// PageInfo defines the information of a page | ||||||||
type PageInfo struct { | ||||||||
Link string `json:"link"` | ||||||||
// Link: The URL of the page. | ||||||||
Link string `json:"link"` | ||||||||
// Title: The title of the page. | ||||||||
Title string `json:"title"` | ||||||||
} | ||||||||
|
||||||||
|
@@ -38,9 +41,12 @@ type CrawlWebsiteInput struct { | |||||||
MaxDepth int `json:"max-depth"` | ||||||||
} | ||||||||
|
||||||||
func (i *CrawlWebsiteInput) Preset() { | ||||||||
if i.MaxK < 0 { | ||||||||
i.MaxK = 0 | ||||||||
func (i *CrawlWebsiteInput) preset() { | ||||||||
if i.MaxK <= 0 { | ||||||||
// When the users set to 0, it means infinite. | ||||||||
// However, there is performance issue when we set it to infinite. | ||||||||
// So, we set the default value to solve performance issue easily. | ||||||||
i.MaxK = 8000 | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This limitation should be documented in the public readme. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean to make it in This setting is just to fix the performance issue. So, the users don't have to know this information. Mainly, it is for developer. I will add clearer comments here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated here. |
||||||||
} | ||||||||
} | ||||||||
|
||||||||
|
@@ -61,7 +67,7 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro | |||||||
return nil, fmt.Errorf("error converting input to struct: %v", err) | ||||||||
} | ||||||||
|
||||||||
inputStruct.Preset() | ||||||||
inputStruct.preset() | ||||||||
|
||||||||
output := ScrapeWebsiteOutput{} | ||||||||
|
||||||||
|
@@ -70,45 +76,67 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro | |||||||
var mu sync.Mutex | ||||||||
pageLinks := []string{} | ||||||||
|
||||||||
// We will have the component timeout feature in the future. | ||||||||
// Before that, we initialize the context here. | ||||||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) | ||||||||
defer cancel() | ||||||||
|
||||||||
// On every a element which has href attribute call callback | ||||||||
// Wont be called if error occurs | ||||||||
c.OnHTML("a[href]", func(e *colly.HTMLElement) { | ||||||||
mu.Lock() | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
The mutex need to be unlocked no matter what. We can use |
||||||||
|
||||||||
if ctx.Err() != nil { | ||||||||
mu.Unlock() | ||||||||
return | ||||||||
} | ||||||||
|
||||||||
// If we set output.Pages to the slice of PageInfo, it will take a longer time if the first html page has a lot of links. | ||||||||
// To improve the small Max-K execution time, we will use a separate slice to store the links. | ||||||||
// However, when K is big, the output length could be less than K. | ||||||||
// So, I set twice the MaxK to stop the scraping. | ||||||||
if inputStruct.MaxK > 0 && len(pageLinks) >= inputStruct.MaxK*2 { | ||||||||
if len(pageLinks) >= inputStruct.MaxK*getPageTimes(inputStruct.MaxK) { | ||||||||
mu.Unlock() | ||||||||
return | ||||||||
} | ||||||||
|
||||||||
link := e.Attr("href") | ||||||||
|
||||||||
if util.InSlice(pageLinks, link) { | ||||||||
mu.Unlock() | ||||||||
return | ||||||||
} | ||||||||
|
||||||||
pageLinks = append(pageLinks, link) | ||||||||
mu.Unlock() | ||||||||
|
||||||||
_ = e.Request.Visit(link) | ||||||||
}) | ||||||||
|
||||||||
// Set error handler | ||||||||
c.OnError(func(r *colly.Response, err error) { | ||||||||
log.Println("Request URL:", r.Request.URL, "failed with response:", r, "\nError:", err) | ||||||||
// In the future, we can design the error handling logic. | ||||||||
}) | ||||||||
|
||||||||
c.OnRequest(func(r *colly.Request) { | ||||||||
mu.Lock() | ||||||||
|
||||||||
// Before length of output page is over, we should always send request. | ||||||||
if inputStruct.MaxK > 0 && len(output.Pages) >= inputStruct.MaxK { | ||||||||
if (len(output.Pages) >= inputStruct.MaxK) || ctx.Err() != nil { | ||||||||
r.Abort() | ||||||||
mu.Unlock() | ||||||||
return | ||||||||
} | ||||||||
|
||||||||
mu.Unlock() | ||||||||
// Set a random user agent to avoid being blocked by websites | ||||||||
r.Headers.Set("User-Agent", randomString()) | ||||||||
}) | ||||||||
|
||||||||
c.OnResponse(func(r *colly.Response) { | ||||||||
if ctx.Err() != nil { | ||||||||
return | ||||||||
} | ||||||||
|
||||||||
strippedURL := stripQueryAndTrailingSlash(r.Request.URL) | ||||||||
|
||||||||
|
@@ -128,20 +156,36 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro | |||||||
title := util.ScrapeWebpageTitle(doc) | ||||||||
page.Title = title | ||||||||
|
||||||||
defer mu.Unlock() | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why we don't use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The colly is hard to understand. I will clean the code and make sure the performance is same. |
||||||||
mu.Lock() | ||||||||
// If we do not set this condition, the length of output.Pages could be over the limit. | ||||||||
if len(output.Pages) < inputStruct.MaxK { | ||||||||
output.Pages = append(output.Pages, page) | ||||||||
|
||||||||
// If the length of output.Pages is equal to MaxK, we should stop the scraping. | ||||||||
if len(output.Pages) == inputStruct.MaxK { | ||||||||
mu.Unlock() | ||||||||
cancel() | ||||||||
return | ||||||||
} | ||||||||
mu.Unlock() | ||||||||
return | ||||||||
} | ||||||||
mu.Unlock() | ||||||||
cancel() | ||||||||
|
||||||||
}) | ||||||||
|
||||||||
// Start scraping | ||||||||
if !strings.HasPrefix(inputStruct.URL, "http://") && !strings.HasPrefix(inputStruct.URL, "https://") { | ||||||||
inputStruct.URL = "https://" + inputStruct.URL | ||||||||
} | ||||||||
_ = c.Visit(inputStruct.URL) | ||||||||
c.Wait() | ||||||||
|
||||||||
go func() { | ||||||||
_ = c.Visit(inputStruct.URL) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it's in a goroutine now. Do we still need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we need it. |
||||||||
c.Wait() | ||||||||
}() | ||||||||
|
||||||||
<-ctx.Done() | ||||||||
|
||||||||
outputStruct, err := base.ConvertToStructpb(output) | ||||||||
if err != nil { | ||||||||
|
@@ -179,19 +223,21 @@ func initColly(inputStruct CrawlWebsiteInput) *colly.Collector { | |||||||
) | ||||||||
|
||||||||
// Limit the number of requests to avoid being blocked. | ||||||||
// Set it to 10 first in case sending too many requests at once. | ||||||||
var parallel int | ||||||||
if inputStruct.MaxK < 10 { | ||||||||
if inputStruct.MaxK < 30 { | ||||||||
parallel = inputStruct.MaxK | ||||||||
} else { | ||||||||
parallel = 10 | ||||||||
parallel = 30 | ||||||||
} | ||||||||
|
||||||||
_ = c.Limit(&colly.LimitRule{ | ||||||||
DomainGlob: "*", | ||||||||
Parallelism: parallel, | ||||||||
// We set the delay to avoid being blocked. | ||||||||
Delay: 100 * time.Millisecond, | ||||||||
}) | ||||||||
|
||||||||
// Timeout here is set of each page rather than whole colly instance. | ||||||||
c.SetRequestTimeout(time.Duration(inputStruct.Timeout) * time.Millisecond) | ||||||||
|
||||||||
if len(inputStruct.AllowedDomains) > 0 { | ||||||||
|
@@ -201,3 +247,12 @@ func initColly(inputStruct CrawlWebsiteInput) *colly.Collector { | |||||||
|
||||||||
return c | ||||||||
} | ||||||||
|
||||||||
// It ensures that we fetch enough pages to get the required number of pages. | ||||||||
func getPageTimes(maxK int) int { | ||||||||
if maxK < 10 { | ||||||||
return 10 | ||||||||
} else { | ||||||||
return 2 | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should document the 120s limitation on the task description, not the max-k field. Though I value having a note here saying that when the task timeout is reached the available results are returned.