-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(web): add input schema to improve web operator #819
Conversation
} | ||
} | ||
}() | ||
|
||
<-ctx.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this redundant with the previous <-ctx.Done()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this to block the main execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but you can skip the go func()
in the previous block and run the for
/ select
in the main thread, right? Otherwise it looks like you're dispatching work in order to wait for it immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, do you mean we actually don't need another go-routine for this block?
So, <-ctx.Done()
in the main thread is not necessary.
inactivityTimer := time.NewTimer(2 * time.Second)
defer inactivityTimer.Stop()
for {
select {
case <-pageUpdateCh:
inactivityTimer.Reset(2 * time.Second)
// If no new pages for 2 seconds, cancel the context
case <-inactivityTimer.C:
cancel()
return
// If the context is done, we should return
case <-ctx.Done():
return
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost what I meant, yes. In this case you'll need to break from the for
loop instead of returning. But this whole block might not be necessary, depending on the thread about c.Wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on the thread about c.Wait()
Yes, I will try it out later! Thanks for carefully reviewing!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on the thread about c.Wait()
It does not work, so I modified the code here.
@@ -148,6 +148,11 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro | |||
r.Headers.Set("User-Agent", randomString()) | |||
}) | |||
|
|||
// colly.Wait() does not terminate the program. So, we need a system to terminate the program when there is no collector. | |||
// We use a channel to notify the main goroutine that a new page has been scraped. | |||
// When there is no new page for 2 seconds, we cancel the context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silly question: is there a way to know there aren't remaining URLs to scrape (e.g. cancelling the context after Wait
finishes)? Even if there's a sweet spot, waiting for any amount of time is always a compromise between being inefficient (waiting when we could return) and risking data loss (there's new data but it's taking longer than the threshold).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, I expected colly should finish the job when there aren't remaining URLs.
But, I could not find a way to close colly when there is no callback jobs.
So, I decided to use this way to deal with the case that the output length hasn't achieved the max-k but there is no URLs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that isn't clear to me is what you mean by
does not terminate the program. So, we need a system to terminate the program when there is no collector.
I'm not familiar with colly
, so what I'm proposing might not have sense (in which case, I'm just trying to understand the code). Colly is a popular enough project, so my intuition tells me there has to be a simpler way to finish the execution when all the websites have been crawled. Also I'm assuming that c.Wait()
does work and that it halts the execution until all the sites have been scraped.
From the assumption I just mentioned, what I see you're trying to accomplish is waiting until the first occurrence between:
- All the sites have been scraped.
c.Wait()
will signal this. - We've reached the 2 minute timeout.
ctx.Done()
will signal this. - We've reached the limit of pages.
ctx.Done()
will also signal this, as we cancel the context when we find this condition.
Therefore, we need to wait for either c.Wait()
or ctx.Done()
. I think we can avoid timing the duration between 2 completed pages and select between ctx.Done()
and a channel that is closed after c.Wait()
ends:
scrapeDone := make(chan struct{})
go func() {
defer close(scrapeDone)
_ = c.Visit(inputStruct.URL)
c.Wait()
}()
select {
case <- scrapeDone:
// No more pages to scrape.
case <-ctx.Done():
// Timeout or page limit reached
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the sites have been scraped.
c.Wait()
will signal this.
Yes! It is the problem. I expect c.Wait()
will finish when there is no more callback (job).
However, when I did end-to-end test, I noticed it did not close this go-routine.
If c.Wait()
can close it, we won't need another chan
(If I understand it correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed it did not close this go-routine.
I don't fully understand this. How were you checking the goroutine had ended? The moment you use go func
, the main thread stops caring about the function you're calling, so it will never know when c.Wait
finished. That's why in my snippet I added a channel that's closed at the end of the spawned function. By listening to that channel, the main thread is signaled about the wait group being unlocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, isn't it simpler to use channels here than mutexes?
I am not quite sure what it specifically means in which part. We do use channels to signal the stop process here. right?
What I mean is that you're defining variables outside of the goroutines and using mutexes to make sure you modify them atomically. To me a much more natural pattern is using channels to communicate the information outside of the routines. There's this article and also the go proverb:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that you're defining variables outside of the goroutines and using mutexes to make sure you modify them atomically.
I got what you mean here. You mean the output pages. right?
I will address this point in other PR and take care of this part in the future development.
Adding this, we can check the 3 scenarios work (in your sample, we'd need to port that to the component code):
Let's back to the current code(PR) not sample one.
How about I do the following thing.
I will add back the scrapeDone
channel and remain the "2 second part" in the codebase.
So, we can deal with 4 scenarios.
- No more pages to scrape: c.Wait() returns and the goroutine closes scrapeDone. Program finishes before the timeout.
- Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout.
- Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout.
- Max pages have not been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds.
And, why we need the 4th one is because what I mentioned here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, make sure you document the 4th in the code 🤝 I updated the sample to use channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jvallesm
Could you help me take final check?
If there is no problem, I'd merge it and release it in this sprint.
@jvallesm |
b07a167
to
9d4cd61
Compare
} | ||
} | ||
}() | ||
|
||
<-ctx.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but you can skip the go func()
in the previous block and run the for
/ select
in the main thread, right? Otherwise it looks like you're dispatching work in order to wait for it immediately.
@@ -148,6 +148,11 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro | |||
r.Headers.Set("User-Agent", randomString()) | |||
}) | |||
|
|||
// colly.Wait() does not terminate the program. So, we need a system to terminate the program when there is no collector. | |||
// We use a channel to notify the main goroutine that a new page has been scraped. | |||
// When there is no new page for 2 seconds, we cancel the context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that isn't clear to me is what you mean by
does not terminate the program. So, we need a system to terminate the program when there is no collector.
I'm not familiar with colly
, so what I'm proposing might not have sense (in which case, I'm just trying to understand the code). Colly is a popular enough project, so my intuition tells me there has to be a simpler way to finish the execution when all the websites have been crawled. Also I'm assuming that c.Wait()
does work and that it halts the execution until all the sites have been scraped.
From the assumption I just mentioned, what I see you're trying to accomplish is waiting until the first occurrence between:
- All the sites have been scraped.
c.Wait()
will signal this. - We've reached the 2 minute timeout.
ctx.Done()
will signal this. - We've reached the limit of pages.
ctx.Done()
will also signal this, as we cancel the context when we find this condition.
Therefore, we need to wait for either c.Wait()
or ctx.Done()
. I think we can avoid timing the duration between 2 completed pages and select between ctx.Done()
and a channel that is closed after c.Wait()
ends:
scrapeDone := make(chan struct{})
go func() {
defer close(scrapeDone)
_ = c.Visit(inputStruct.URL)
c.Wait()
}()
select {
case <- scrapeDone:
// No more pages to scrape.
case <-ctx.Done():
// Timeout or page limit reached
}
@jvallesm I modified it again. Please take a look! Thank you! |
4a9055a
to
e3a186f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #819 +/- ##
==========================================
+ Coverage 20.01% 20.50% +0.48%
==========================================
Files 354 359 +5
Lines 74750 75198 +448
==========================================
+ Hits 14963 15416 +453
+ Misses 57571 57484 -87
- Partials 2216 2298 +82
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…x when there is no scraping url
e3a186f
to
cdfcada
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some post-merge comments that aren't a blocker to move forward with deploying this.
// 1. No more pages to scrape: c.Wait() returns and the goroutine closes scrapeDone. Program finishes before the timeout. | ||
// 2. Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout. | ||
// 3. Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout. | ||
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds. | |
// 4. Max pages haven't been collected but the delay between completed requests has increased over 2 seconds. We consider this data invalid because (insert reason here), so we avoid waiting for the timeout. |
// 2. Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout. | ||
// 3. Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout. | ||
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds. | ||
// We use 4. to avoid the program waiting for 2 minutes to close all URLs that will wait for over timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still not super clear to me 😅
return | ||
} | ||
|
||
pageLinks = append(pageLinks, link) | ||
requestURL := stripQueryAndTrailingSlash(parsedURL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you add this bit? Is it related with the URL filter?
🤖 I have created a release *beep* *boop* --- ## [0.47.0-beta](v0.46.0-beta...v0.47.0-beta) (2024-11-20) ### Features * **component:** add support for event specifications ([#837](#837)) ([47a61cd](47a61cd)) * **component:** implement run-on-event for Slack and GitHub component ([#842](#842)) ([1b6a569](1b6a569)) * convert pdf to image concurrently ([#818](#818)) ([4c0ad97](4c0ad97)) * improve markdown chunking ([#822](#822)) ([af1a36a](af1a36a)) * **json:** Support Rename Fields for JSON operator ([#813](#813)) ([093714e](093714e)) * **recipe:** refactor run-on-event recipe structure ([#835](#835)) ([78ea418](78ea418)) * **recipe:** rename `instill-format` to `format` ([#798](#798)) ([80a9fc9](80a9fc9)) * **service:** implement PipelineErrorUpdated streaming event for pipeline errors ([#846](#846)) ([3156a5f](3156a5f)) * **vdp:** integrate blob storage to vdp ([#834](#834)) ([5311549](5311549)) * **web:** add input schema to improve web operator ([#819](#819)) ([f7e1fe9](f7e1fe9)) ### Bug Fixes * **data:** refactor numberData to support both float and integer types ([#832](#832)) ([cf27452](cf27452)) * **document:** fix bug about convert to image ([#848](#848)) ([a381c27](a381c27)) * fix bug about unit type ([#826](#826)) ([a89fdf7](a89fdf7)) * **integration-test:** maximize build space on image build & push ([#823](#823)) ([a439d22](a439d22)) * **run:** set pipeline run status as failed when component fails ([#836](#836)) ([70a5c52](70a5c52)) * **service:** add MIME type detection in the backend binaryFetcher ([#854](#854)) ([f434b2b](f434b2b)) * **service:** add missing nil check in includeIteratorComponentDetail() ([#831](#831)) ([9cb5e9e](9cb5e9e)) * **service:** skip empty component definition in API response ([#847](#847)) ([d61b55e](d61b55e)) * unit tests ([#820](#820)) ([717200c](717200c)) * **vdp:** item does not contain the instill format, so we insert it ([#858](#858)) ([2d25401](2d25401)) * **workflow:** allow integration usage within iterator ([#833](#833)) ([c9bd169](c9bd169)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Because
This commit
Note