Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(web): add input schema to improve web operator #819

Merged
merged 12 commits into from
Nov 18, 2024

Conversation

chuang8511
Copy link
Contributor

@chuang8511 chuang8511 commented Nov 6, 2024

Because

  • we want to filter urls

This commit

  • add the filter function in web operator
  • add termination system for context
  • fix compogen bug

Note

  • we update compogen to add blank line back in template. It is the reason why there are much more files changed.

Copy link

linear bot commented Nov 6, 2024

pkg/component/operator/web/v0/config/tasks.json Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/config/tasks.json Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/config/tasks.json Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/crawl_website.go Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/config/tasks.json Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/helper.go Outdated Show resolved Hide resolved
}
}
}()

<-ctx.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this redundant with the previous <-ctx.Done()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to block the main execution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but you can skip the go func() in the previous block and run the for / select in the main thread, right? Otherwise it looks like you're dispatching work in order to wait for it immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do you mean we actually don't need another go-routine for this block?
So, <-ctx.Done() in the main thread is not necessary.

		inactivityTimer := time.NewTimer(2 * time.Second)
		defer inactivityTimer.Stop()

		for {
			select {
			case <-pageUpdateCh:
				inactivityTimer.Reset(2 * time.Second)
			// If no new pages for 2 seconds, cancel the context
			case <-inactivityTimer.C:
				cancel()
				return
			// If the context is done, we should return
			case <-ctx.Done():
				return
			}
		}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost what I meant, yes. In this case you'll need to break from the for loop instead of returning. But this whole block might not be necessary, depending on the thread about c.Wait()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on the thread about c.Wait()

Yes, I will try it out later! Thanks for carefully reviewing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on the thread about c.Wait()

It does not work, so I modified the code here.

pkg/component/operator/web/v0/crawl_website.go Outdated Show resolved Hide resolved
pkg/component/operator/web/v0/crawl_website.go Outdated Show resolved Hide resolved
@@ -148,6 +148,11 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro
r.Headers.Set("User-Agent", randomString())
})

// colly.Wait() does not terminate the program. So, we need a system to terminate the program when there is no collector.
// We use a channel to notify the main goroutine that a new page has been scraped.
// When there is no new page for 2 seconds, we cancel the context.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silly question: is there a way to know there aren't remaining URLs to scrape (e.g. cancelling the context after Wait finishes)? Even if there's a sweet spot, waiting for any amount of time is always a compromise between being inefficient (waiting when we could return) and risking data loss (there's new data but it's taking longer than the threshold).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I expected colly should finish the job when there aren't remaining URLs.

But, I could not find a way to close colly when there is no callback jobs.
So, I decided to use this way to deal with the case that the output length hasn't achieved the max-k but there is no URLs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that isn't clear to me is what you mean by

does not terminate the program. So, we need a system to terminate the program when there is no collector.

I'm not familiar with colly, so what I'm proposing might not have sense (in which case, I'm just trying to understand the code). Colly is a popular enough project, so my intuition tells me there has to be a simpler way to finish the execution when all the websites have been crawled. Also I'm assuming that c.Wait() does work and that it halts the execution until all the sites have been scraped.

From the assumption I just mentioned, what I see you're trying to accomplish is waiting until the first occurrence between:

  • All the sites have been scraped. c.Wait() will signal this.
  • We've reached the 2 minute timeout. ctx.Done() will signal this.
  • We've reached the limit of pages. ctx.Done() will also signal this, as we cancel the context when we find this condition.

Therefore, we need to wait for either c.Wait() or ctx.Done(). I think we can avoid timing the duration between 2 completed pages and select between ctx.Done() and a channel that is closed after c.Wait() ends:

	scrapeDone := make(chan struct{})

	go func() {
		defer close(scrapeDone)
		_ = c.Visit(inputStruct.URL)
		c.Wait()
	}()

	select {
	case <- scrapeDone:
		// No more pages to scrape.
	case <-ctx.Done():
		// Timeout or page limit reached
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the sites have been scraped. c.Wait() will signal this.

Yes! It is the problem. I expect c.Wait() will finish when there is no more callback (job).
However, when I did end-to-end test, I noticed it did not close this go-routine.
If c.Wait() can close it, we won't need another chan(If I understand it correctly).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it did not close this go-routine.

I don't fully understand this. How were you checking the goroutine had ended? The moment you use go func, the main thread stops caring about the function you're calling, so it will never know when c.Wait finished. That's why in my snippet I added a channel that's closed at the end of the spawned function. By listening to that channel, the main thread is signaled about the wait group being unlocked.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, isn't it simpler to use channels here than mutexes?

I am not quite sure what it specifically means in which part. We do use channels to signal the stop process here. right?

What I mean is that you're defining variables outside of the goroutines and using mutexes to make sure you modify them atomically. To me a much more natural pattern is using channels to communicate the information outside of the routines. There's this article and also the go proverb:

Channels orchestrate; mutexes serialize..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that you're defining variables outside of the goroutines and using mutexes to make sure you modify them atomically.

I got what you mean here. You mean the output pages. right?
I will address this point in other PR and take care of this part in the future development.

Adding this, we can check the 3 scenarios work (in your sample, we'd need to port that to the component code):

Let's back to the current code(PR) not sample one.
How about I do the following thing.

I will add back the scrapeDone channel and remain the "2 second part" in the codebase.
So, we can deal with 4 scenarios.

  1. No more pages to scrape: c.Wait() returns and the goroutine closes scrapeDone. Program finishes before the timeout.
  2. Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout.
  3. Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout.
  4. Max pages have not been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds.

And, why we need the 4th one is because what I mentioned here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, make sure you document the 4th in the code 🤝 I updated the sample to use channels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvallesm
Could you help me take final check?
If there is no problem, I'd merge it and release it in this sprint.

@chuang8511 chuang8511 changed the title feat: add input schema to improve web operator feat(web): add input schema to improve web operator Nov 11, 2024
@chuang8511
Copy link
Contributor Author

@jvallesm
Thanks for reviewing.
I modified and replied all of them.
Please take a look when you have time!

@chuang8511 chuang8511 force-pushed the chunhao/ins-6739-web-operator-improve branch from b07a167 to 9d4cd61 Compare November 11, 2024 19:07
}
}
}()

<-ctx.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but you can skip the go func() in the previous block and run the for / select in the main thread, right? Otherwise it looks like you're dispatching work in order to wait for it immediately.

@@ -148,6 +148,11 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro
r.Headers.Set("User-Agent", randomString())
})

// colly.Wait() does not terminate the program. So, we need a system to terminate the program when there is no collector.
// We use a channel to notify the main goroutine that a new page has been scraped.
// When there is no new page for 2 seconds, we cancel the context.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that isn't clear to me is what you mean by

does not terminate the program. So, we need a system to terminate the program when there is no collector.

I'm not familiar with colly, so what I'm proposing might not have sense (in which case, I'm just trying to understand the code). Colly is a popular enough project, so my intuition tells me there has to be a simpler way to finish the execution when all the websites have been crawled. Also I'm assuming that c.Wait() does work and that it halts the execution until all the sites have been scraped.

From the assumption I just mentioned, what I see you're trying to accomplish is waiting until the first occurrence between:

  • All the sites have been scraped. c.Wait() will signal this.
  • We've reached the 2 minute timeout. ctx.Done() will signal this.
  • We've reached the limit of pages. ctx.Done() will also signal this, as we cancel the context when we find this condition.

Therefore, we need to wait for either c.Wait() or ctx.Done(). I think we can avoid timing the duration between 2 completed pages and select between ctx.Done() and a channel that is closed after c.Wait() ends:

	scrapeDone := make(chan struct{})

	go func() {
		defer close(scrapeDone)
		_ = c.Visit(inputStruct.URL)
		c.Wait()
	}()

	select {
	case <- scrapeDone:
		// No more pages to scrape.
	case <-ctx.Done():
		// Timeout or page limit reached
	}

@chuang8511
Copy link
Contributor Author

@jvallesm I modified it again. Please take a look! Thank you!

@chuang8511 chuang8511 force-pushed the chunhao/ins-6739-web-operator-improve branch from 4a9055a to e3a186f Compare November 15, 2024 09:33
Copy link

codecov bot commented Nov 15, 2024

Codecov Report

Attention: Patch coverage is 25.00000% with 45 lines in your changes missing coverage. Please review.

Project coverage is 20.50%. Comparing base (7e5d3de) to head (e3a186f).
Report is 20 commits behind head on main.

Files with missing lines Patch % Lines
pkg/component/operator/web/v0/crawl_website.go 13.46% 43 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #819      +/-   ##
==========================================
+ Coverage   20.01%   20.50%   +0.48%     
==========================================
  Files         354      359       +5     
  Lines       74750    75198     +448     
==========================================
+ Hits        14963    15416     +453     
+ Misses      57571    57484      -87     
- Partials     2216     2298      +82     
Flag Coverage Δ
unittests 20.50% <25.00%> (+0.48%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@chuang8511 chuang8511 force-pushed the chunhao/ins-6739-web-operator-improve branch from e3a186f to cdfcada Compare November 18, 2024 15:11
@chuang8511 chuang8511 requested a review from jvallesm November 18, 2024 16:53
@chuang8511 chuang8511 merged commit f7e1fe9 into main Nov 18, 2024
12 checks passed
@chuang8511 chuang8511 deleted the chunhao/ins-6739-web-operator-improve branch November 18, 2024 17:38
Copy link
Collaborator

@jvallesm jvallesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some post-merge comments that aren't a blocker to move forward with deploying this.

// 1. No more pages to scrape: c.Wait() returns and the goroutine closes scrapeDone. Program finishes before the timeout.
// 2. Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout.
// 3. Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout.
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds.
// 4. Max pages haven't been collected but the delay between completed requests has increased over 2 seconds. We consider this data invalid because (insert reason here), so we avoid waiting for the timeout.

// 2. Max pages scraped: c.OnResponse cancels the context / closes scrapeDone. Program finishes before the timeout.
// 3. Max pages haven't been collected before the timeout. Context is canceled and program finishes at the timeout.
// 4. Max pages haven't been collected before the timeout. Context is canceled and the program finishes when there are no more validate data in 2 seconds.
// We use 4. to avoid the program waiting for 2 minutes to close all URLs that will wait for over timeout.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not super clear to me 😅

return
}

pageLinks = append(pageLinks, link)
requestURL := stripQueryAndTrailingSlash(parsedURL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add this bit? Is it related with the URL filter?

donch1989 pushed a commit that referenced this pull request Nov 20, 2024
🤖 I have created a release *beep* *boop*
---


##
[0.47.0-beta](v0.46.0-beta...v0.47.0-beta)
(2024-11-20)


### Features

* **component:** add support for event specifications
([#837](#837))
([47a61cd](47a61cd))
* **component:** implement run-on-event for Slack and GitHub component
([#842](#842))
([1b6a569](1b6a569))
* convert pdf to image concurrently
([#818](#818))
([4c0ad97](4c0ad97))
* improve markdown chunking
([#822](#822))
([af1a36a](af1a36a))
* **json:** Support Rename Fields for JSON operator
([#813](#813))
([093714e](093714e))
* **recipe:** refactor run-on-event recipe structure
([#835](#835))
([78ea418](78ea418))
* **recipe:** rename `instill-format` to `format`
([#798](#798))
([80a9fc9](80a9fc9))
* **service:** implement PipelineErrorUpdated streaming event for
pipeline errors
([#846](#846))
([3156a5f](3156a5f))
* **vdp:** integrate blob storage to vdp
([#834](#834))
([5311549](5311549))
* **web:** add input schema to improve web operator
([#819](#819))
([f7e1fe9](f7e1fe9))


### Bug Fixes

* **data:** refactor numberData to support both float and integer types
([#832](#832))
([cf27452](cf27452))
* **document:** fix bug about convert to image
([#848](#848))
([a381c27](a381c27))
* fix bug about unit type
([#826](#826))
([a89fdf7](a89fdf7))
* **integration-test:** maximize build space on image build & push
([#823](#823))
([a439d22](a439d22))
* **run:** set pipeline run status as failed when component fails
([#836](#836))
([70a5c52](70a5c52))
* **service:** add MIME type detection in the backend binaryFetcher
([#854](#854))
([f434b2b](f434b2b))
* **service:** add missing nil check in includeIteratorComponentDetail()
([#831](#831))
([9cb5e9e](9cb5e9e))
* **service:** skip empty component definition in API response
([#847](#847))
([d61b55e](d61b55e))
* unit tests
([#820](#820))
([717200c](717200c))
* **vdp:** item does not contain the instill format, so we insert it
([#858](#858))
([2d25401](2d25401))
* **workflow:** allow integration usage within iterator
([#833](#833))
([c9bd169](c9bd169))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants