Skip to content

Commit

Permalink
Merge pull request #20 from DrBlury/main
Browse files Browse the repository at this point in the history
Use go arenas, wait group, remove binary heap completely
  • Loading branch information
jinyus authored Sep 26, 2023
2 parents 751a9a6 + 2cdcd4f commit 96b0c43
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 61 deletions.
Empty file added go.work.sum
Empty file.
1 change: 0 additions & 1 deletion go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {

var posts []Post
err = json.NewDecoder(file).Decode(&posts)

if err != nil {
log.Panicln(err)
}
Expand Down
2 changes: 0 additions & 2 deletions go_con/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@ module g.io/related_concurrent
go 1.21.1

require github.com/goccy/go-json v0.10.2

require github.com/ugurcsen/gods-generic v0.10.4 // indirect
2 changes: 0 additions & 2 deletions go_con/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/ugurcsen/gods-generic v0.10.4 h1:OomH3R2MdzZxpnEPijaD/ncLzV6rpDXd5ruEkWsw0vo=
github.com/ugurcsen/gods-generic v0.10.4/go.mod h1:mGYOa88Y5sbw+ADXLpScxjJ7s5iHoWya/YHyeQ4f6c4=
111 changes: 57 additions & 54 deletions go_con/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package main

import (
"arena"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"

"github.com/goccy/go-json"
"github.com/ugurcsen/gods-generic/trees/binaryheap"
)

// custom type alias - for easier experiments with int size
// using smaller than int64 integer size but still big enough for 4 billion posts
type isize uint32

var concurrency = isize(runtime.NumCPU())
var a *arena.Arena

type isize uint32

type Post struct {
ID string `json:"_id"`
Expand Down Expand Up @@ -45,8 +47,12 @@ func main() {
if err != nil {
log.Panicln(err)
}
defer file.Close()

a = arena.NewArena() // Create a new arena

var posts []Post
posts = arena.MakeSlice[Post](a, 0, 10000)

err = json.NewDecoder(file).Decode(&posts)
if err != nil {
Expand All @@ -55,7 +61,8 @@ func main() {

start := time.Now()

postsLength := isize(len(posts))
postsLength := len(posts)
postsLengthISize := isize(postsLength)

tagMap := make(map[string][]isize, 100)
for i, post := range posts {
Expand All @@ -64,36 +71,30 @@ func main() {
}
}

resultsChan := make(chan Result, postsLength)
doneChan := make(chan bool, concurrency)
resultsChan := make(chan Result, postsLengthISize)

// create wait group to wait for all workers to finish
wg := sync.WaitGroup{}
wg.Add(int(concurrency))
var w isize
for ; w < concurrency; w++ {
for ; w < isize(concurrency); w++ {
// allocate taggedPostCount for each worker once, zero out for each task
taggedPostCount := make([]isize, postsLength)
t5 := binaryheap.NewWith[PostWithSharedTags](PostComparator)
taggedPostCount := arena.MakeSlice[isize](a, postsLength, postsLength)
go func(workerID isize) {
for i := workerID; i < postsLength; i += concurrency {
for i := workerID; i < postsLengthISize; i += concurrency {
// provide taggedPostCount and binary heap for each task
resultsChan <- Result{
Index: isize(i),
RelatedPost: computeRelatedPost(i, posts, tagMap, taggedPostCount, t5),
Index: i,
RelatedPost: computeRelatedPost(i, posts, tagMap, taggedPostCount),
}
}

doneChan <- true
wg.Done()
}(w)
}

var i isize
for ; i < concurrency; i++ {
<-doneChan
}

wg.Wait()
close(resultsChan)
close(doneChan)

allRelatedPosts := make([]RelatedPosts, len(posts))
allRelatedPosts := arena.MakeSlice[RelatedPosts](a, postsLength, postsLength)
for r := range resultsChan {
allRelatedPosts[r.Index] = r.RelatedPost
}
Expand All @@ -111,40 +112,52 @@ func main() {
if err != nil {
log.Panicln(err)
}
a.Free()
}

func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, taggedPostCount []isize, t5 *binaryheap.Heap[PostWithSharedTags]) RelatedPosts {

// zero out tagged post count
for i := range taggedPostCount {
taggedPostCount[i] = 0
func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, taggedPostCount []isize) RelatedPosts {
// Zero out tagged post count
for j := range taggedPostCount {
taggedPostCount[j] = 0
}

// Count the number of tags shared between posts
for _, tag := range posts[i].Tags {
for _, otherPostIdx := range tagMap[tag] {
taggedPostCount[otherPostIdx]++
if otherPostIdx != i { // Exclude the post itself
taggedPostCount[otherPostIdx]++
}
}
}
// zero out current post count - no need for branch in a for loop above
taggedPostCount[i] = 0

for v, count := range taggedPostCount {
if t5.Size() < 5 {
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})
} else {
if t, _ := t5.Peek(); t.SharedTags < count {
t5.Pop()
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})

top5 := [5]PostWithSharedTags{}
minTags := isize(0) // Updated initialization

for j, count := range taggedPostCount {
if count > minTags {
// Find the position to insert
pos := 4
for pos >= 0 && top5[pos].SharedTags < count {
pos--
}
pos++

// Shift and insert
if pos < 4 {
copy(top5[pos+1:], top5[pos:4])
}
if pos <= 4 {
top5[pos] = PostWithSharedTags{Post: isize(j), SharedTags: count}
minTags = top5[4].SharedTags
}
}
}

num := min(5, t5.Size())
topPosts := make([]*Post, num)

for i := 0; i < num; i++ {
if t, ok := t5.Pop(); ok {
topPosts[i] = &posts[t.Post]
// Convert indexes back to Post pointers
topPosts := make([]*Post, 0, 5)
for _, t := range top5 {
if t.SharedTags > 0 {
topPosts = append(topPosts, &posts[t.Post])
}
}

Expand All @@ -154,13 +167,3 @@ func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, tagged
Related: topPosts,
}
}

func PostComparator(a, b PostWithSharedTags) int {
if a.SharedTags > b.SharedTags {
return 1
}
if a.SharedTags < b.SharedTags {
return -1
}
return 0
}
3 changes: 1 addition & 2 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ run_go() {
run_go_concurrent() {
echo "Running Go with concurrency" &&
cd ./go_con &&
go build &&
GOEXPERIMENT=arenas go build &&
if [ $HYPER == 1 ]; then
command hyperfine -r 10 -w 3 --show-output "./related_concurrent"
else
command time -f '%es %Mk' ./related_concurrent
fi

check_output "related_posts_go_con.json"

}

run_rust() {
Expand Down

0 comments on commit 96b0c43

Please sign in to comment.