Skip to content

Commit

Permalink
Merge pull request #17 from tirprox/main
Browse files Browse the repository at this point in the history
golang concurrency improvements
  • Loading branch information
jinyus authored Sep 26, 2023
2 parents 0e9451a + ce0169b commit 7a56711
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions go_con/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import (
"github.com/ugurcsen/gods-generic/trees/binaryheap"
)

var concurrency = runtime.NumCPU()
// custom type alias - for easier experiments with int size
// using smaller than int64 integer size but still big enough for 4 billion posts
type isize uint32

var concurrency = isize(runtime.NumCPU())

type Post struct {
ID string `json:"_id"`
Expand All @@ -20,8 +24,8 @@ type Post struct {
}

type PostWithSharedTags struct {
Post int
SharedTags int
Post isize
SharedTags isize
}

type RelatedPosts struct {
Expand All @@ -32,7 +36,7 @@ type RelatedPosts struct {

// Result struct to hold results from goroutines
type Result struct {
Index int
Index isize
RelatedPost RelatedPosts
}

Expand All @@ -43,36 +47,49 @@ func main() {
}

var posts []Post

err = json.NewDecoder(file).Decode(&posts)
if err != nil {
log.Panicln(err)
}

start := time.Now()

tagMap := make(map[string][]int, 100)
postsLength := isize(len(posts))

tagMap := make(map[string][]isize, 100)
for i, post := range posts {
for _, tag := range post.Tags {
tagMap[tag] = append(tagMap[tag], i)
tagMap[tag] = append(tagMap[tag], isize(i))
}
}

resultsChan := make(chan Result, len(posts))
resultsChan := make(chan Result, postsLength)
doneChan := make(chan bool, concurrency)

for w := 0; w < concurrency; w++ {
go func(workerID int) {
defer func() { doneChan <- true }()
for i := workerID; i < len(posts); i += concurrency {
relatedPost := computeRelatedPost(i, posts, tagMap)
resultsChan <- Result{Index: i, RelatedPost: relatedPost}
var w isize
for ; w < concurrency; w++ {
// allocate taggedPostCount for each worker once, zero out for each task
taggedPostCount := make([]isize, postsLength)
t5 := binaryheap.NewWith[PostWithSharedTags](PostComparator)
go func(workerID isize) {
for i := workerID; i < postsLength; i += concurrency {
// provide taggedPostCount and binary heap for each task
resultsChan <- Result{
Index: isize(i),
RelatedPost: computeRelatedPost(i, posts, tagMap, taggedPostCount, t5),
}
}

doneChan <- true
}(w)
}

for i := 0; i < concurrency; i++ {
var i isize
for ; i < concurrency; i++ {
<-doneChan
}

close(resultsChan)
close(doneChan)

Expand All @@ -96,25 +113,28 @@ func main() {
}
}

func computeRelatedPost(i int, posts []Post, tagMap map[string][]int) RelatedPosts {
taggedPostCount := make([]int, len(posts))
t5 := binaryheap.NewWith[PostWithSharedTags](PostComparator)
func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, taggedPostCount []isize, t5 *binaryheap.Heap[PostWithSharedTags]) RelatedPosts {

// zero out tagged post count
for i := range taggedPostCount {
taggedPostCount[i] = 0
}

for _, tag := range posts[i].Tags {
for _, otherPostIdx := range tagMap[tag] {
if otherPostIdx != i {
taggedPostCount[otherPostIdx]++
}
taggedPostCount[otherPostIdx]++
}
}
// zero out current post count - no need for branch in a for loop above
taggedPostCount[i] = 0

for v, count := range taggedPostCount {
if t5.Size() < 5 {
t5.Push(PostWithSharedTags{Post: v, SharedTags: count})
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})
} else {
if t, _ := t5.Peek(); t.SharedTags < count {
t5.Pop()
t5.Push(PostWithSharedTags{Post: v, SharedTags: count})
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})
}
}
}
Expand Down

0 comments on commit 7a56711

Please sign in to comment.