Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapReduce Lab Solution #1

Merged
merged 10 commits into from
May 2, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pkg/
api.key
.api.key.trimmed
*-handin.tar.gz
.idea/
2 changes: 1 addition & 1 deletion src/main/test-mr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#

# un-comment this to run the tests with the Go race detector.
# RACE=-race
RACE=-race

if [[ "$OSTYPE" = "darwin"* ]]
then
Expand Down
210 changes: 187 additions & 23 deletions src/mr/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,151 @@
package mr

import "log"
import (
"crypto/rand"
"encoding/base64"
"encoding/gob"
"fmt"
"log"
"sync"
"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

type JobStatus int
type JobsType int

const (
Ready JobStatus = iota
WaitingForMap
InProgress
Completed
)

const (
Map JobsType = iota
Reduce
)

type Job struct {
Id string
BucketNumber int
JobFile string
WorkerId string
Status JobStatus
JobsType JobsType
StartTime time.Time
}

type Coordinator struct {
// Your definitions here.
Jobs []Job
Mutex sync.Mutex
ReduceBucketCount int
}

func init() {
gob.Register(MapTaskReply{})
gob.Register(ReduceTaskReply{})
gob.Register(ExitTaskReply{})
gob.Register(WaitTaskReply{})
}

// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.Mutex.Lock()
defer c.Mutex.Unlock()

// if all tasks are completed, tell the worker to exit
var allCompleted = true
for _, task := range c.Jobs {
if task.Status != Completed {
allCompleted = false
break
}
}
if allCompleted {
reply.TaskReply = ExitTaskReply{}
return nil
}

var readyJob *Job
for i, task := range c.Jobs {
if task.Status == Ready {
readyJob = &c.Jobs[i]
break
}
}

// if no tasks to assign, make the worker wait
if readyJob == nil {
reply.TaskReply = WaitTaskReply{}
return nil
}

readyJob.WorkerId = args.WorkerId
readyJob.Status = InProgress
readyJob.StartTime = time.Now()

// switch for the readyJob's reflected type
switch readyJob.JobsType {
case Map:
reply.TaskReply = MapTaskReply{
JobFile: readyJob.JobFile,
BucketCount: c.ReduceBucketCount,
JobId: readyJob.Id,
}
case Reduce:
reply.TaskReply = ReduceTaskReply{
BucketNumber: readyJob.BucketNumber,
JobId: readyJob.Id,
}
}

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}

func (c *Coordinator) JobFinish(args *JobFinishArgs, reply *JobFinishReply) error {
c.Mutex.Lock()
defer c.Mutex.Unlock()

for i, job := range c.Jobs {
if job.Id == args.JobId {
// if the recently completed job is a map job and all map jobs are now complete, mark reduce jobs as ready
if job.Status == Completed {
break
}

if job.JobsType == Map {
var allMapJobsCompleted = true
for _, mapJob := range c.Jobs {
if mapJob.Id == job.Id {
continue
}

if mapJob.JobsType == Map && mapJob.Status != Completed {
allMapJobsCompleted = false
break
}
}

if allMapJobsCompleted {
for j, reduceJob := range c.Jobs {
if reduceJob.JobsType == Reduce && reduceJob.Status == WaitingForMap {
c.Jobs[j].Status = Ready
}
}
}
}
c.Jobs[i].Status = Completed
break
}
}

return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
Expand All @@ -41,30 +159,76 @@ func (c *Coordinator) server() {
go http.Serve(l, nil)
}

//
func newJobId() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
log.Fatal(err)
}
return base64.URLEncoding.EncodeToString(b)
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
ret := false
c.Mutex.Lock()
defer c.Mutex.Unlock()

allCompleted := true
for _, task := range c.Jobs {
if task.Status != Completed {
allCompleted = false
break
}
}

return allCompleted
}

// Your code here.
func (c *Coordinator) CleanLongRunningTasks() {
clean := func() {
c.Mutex.Lock()
defer c.Mutex.Unlock()

for i, task := range c.Jobs {
if task.Status == InProgress && time.Since(task.StartTime).Seconds() > 10 {
c.Jobs[i].Status = Ready
}
}
}

return ret
for {
clean()
time.Sleep(time.Second)
}
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.
c.ReduceBucketCount = nReduce

fmt.Println("Coordinator: Adding map jobs")
for _, file := range files {
c.Jobs = append(c.Jobs, Job{
Id: newJobId(),
JobFile: file,
Status: Ready,
JobsType: Map,
})
}

fmt.Println("Coordinator: Adding reduce jobs")
for bucket := 0; bucket < nReduce; bucket++ {
c.Jobs = append(c.Jobs, Job{
Id: newJobId(),
BucketNumber: bucket,
Status: WaitingForMap,
JobsType: Reduce,
})
}

go c.CleanLongRunningTasks()
c.server()
return &c
}
44 changes: 29 additions & 15 deletions src/mr/rpc.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//
type GetTaskArgs struct {
WorkerId string
}

type TaskReply interface{}

type ExampleArgs struct {
X int
type MapTaskReply struct {
JobId string
JobFile string
BucketCount int
}

type ExampleReply struct {
Y int
type ReduceTaskReply struct {
BucketNumber int
JobId string
}

// Add your RPC definitions here.
// ExitTaskReply when the coordinator has finished the entire job, tells the worker to exit
type ExitTaskReply struct {
}

// WaitTaskReply when there are no tasks to be done
type WaitTaskReply struct {
}

type GetTaskReply struct {
TaskReply
}

type JobFinishArgs struct {
JobId string
}

type JobFinishReply struct {
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
Expand Down
Loading