From 3f80437f71b5081259697b7230949dc3f5ecbd4d Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Tue, 30 Apr 2024 18:54:38 -0400 Subject: [PATCH 01/10] Checkpoint: example RPC from worker to master --- .gitignore | 1 + src/main/test-mr.sh | 2 +- src/mr/worker.go | 15 +-------------- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index a209ee8..eabbdf7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ pkg/ api.key .api.key.trimmed *-handin.tar.gz +.idea/ \ No newline at end of file diff --git a/src/main/test-mr.sh b/src/main/test-mr.sh index 210019f..8cf93f3 100644 --- a/src/main/test-mr.sh +++ b/src/main/test-mr.sh @@ -5,7 +5,7 @@ # # un-comment this to run the tests with the Go race detector. -# RACE=-race +RACE=-race if [[ "$OSTYPE" = "darwin"* ]] then diff --git a/src/mr/worker.go b/src/mr/worker.go index aaa8b64..a06ee0a 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -5,44 +5,33 @@ import "log" import "net/rpc" import "hash/fnv" - -// // Map functions return a slice of KeyValue. -// type KeyValue struct { Key string Value string } -// // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. -// func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } - -// // main/mrworker.go calls this function. -// func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { - // Your worker implementation here. // uncomment to send the Example RPC to the coordinator. - // CallExample() + CallExample() } -// // example function to show how to make an RPC call to the coordinator. // // the RPC argument and reply types are defined in rpc.go. -// func CallExample() { // declare an argument structure. @@ -67,11 +56,9 @@ func CallExample() { } } -// // send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. -// func call(rpcname string, args interface{}, reply interface{}) bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock() From b13630db74c04ea283e0ab5cb158df69d2a73e9d Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Tue, 30 Apr 2024 19:29:28 -0400 Subject: [PATCH 02/10] Checkpoint: Add RPC for workers to get a file name --- src/mr/coordinator.go | 35 +++++++++++++---------------------- src/mr/rpc.go | 21 +++------------------ src/mr/worker.go | 29 ++++++----------------------- 3 files changed, 22 insertions(+), 63 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cafda57..3d794b9 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,33 +1,30 @@ package mr -import "log" +import ( + "log" + "strings" +) import "net" import "os" import "net/rpc" import "net/http" - type Coordinator struct { - // Your definitions here. - } -// Your code here -- RPC handlers for the worker to call. - -// -// an example RPC handler. -// -// the RPC argument and reply types are defined in rpc.go. -// -func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { - reply.Y = args.X + 1 +func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { + // get all files with suffix .txt and return the first file + var files []string + for _, file := range os.Args { + if strings.HasSuffix(file, ".txt") { + files = append(files, file) + } + } + reply.TaskFile = files[0] return nil } - -// // start a thread that listens for RPCs from worker.go -// func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() @@ -41,30 +38,24 @@ func (c *Coordinator) server() { go http.Serve(l, nil) } -// // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. -// func (c *Coordinator) Done() bool { ret := false // Your code here. - return ret } -// // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. -// func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} // Your code here. - c.server() return &c } diff --git a/src/mr/rpc.go b/src/mr/rpc.go index 1f15466..b038860 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -1,30 +1,15 @@ package mr -// -// RPC definitions. -// -// remember to capitalize all names. -// - import "os" import "strconv" -// -// example to show how to declare the arguments -// and reply for an RPC. -// - -type ExampleArgs struct { - X int +type GetTaskArgs struct { } -type ExampleReply struct { - Y int +type GetTaskReply struct { + TaskFile string } -// Add your RPC definitions here. - - // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Can't use the current directory since diff --git a/src/mr/worker.go b/src/mr/worker.go index a06ee0a..107b682 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -25,32 +25,15 @@ func Worker(mapf func(string, string) []KeyValue, // Your worker implementation here. // uncomment to send the Example RPC to the coordinator. - CallExample() - + CallGetTask() } -// example function to show how to make an RPC call to the coordinator. -// -// the RPC argument and reply types are defined in rpc.go. -func CallExample() { - - // declare an argument structure. - args := ExampleArgs{} - - // fill in the argument(s). - args.X = 99 - - // declare a reply structure. - reply := ExampleReply{} - - // send the RPC request, wait for the reply. - // the "Coordinator.Example" tells the - // receiving server that we'd like to call - // the Example() method of struct Coordinator. - ok := call("Coordinator.Example", &args, &reply) +func CallGetTask() { + args := GetTaskArgs{} + reply := GetTaskReply{} + ok := call("Coordinator.GetTask", &args, &reply) if ok { - // reply.Y should be 100. - fmt.Printf("reply.Y %v\n", reply.Y) + fmt.Printf("reply.TaskFile %v\n", reply.TaskFile) } else { fmt.Printf("call failed!\n") } From 5e01124c5ab867eb740196e97cc859af4c30855e Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Tue, 30 Apr 2024 19:56:05 -0400 Subject: [PATCH 03/10] Checkpoint: Prepare initial state for the coordinator --- src/mr/coordinator.go | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index 3d794b9..96a4309 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,15 +1,42 @@ package mr import ( + "fmt" "log" "strings" + "sync" ) import "net" import "os" import "net/rpc" import "net/http" +type TaskStatus int +type TaskType int + +const ( + Ready TaskStatus = iota + InProgress + Completed +) + +const ( + Map TaskType = iota + Reduce +) + +type Task struct { + TaskFile string + Status TaskStatus + WorkerId string + OutputFiles map[int]string // reduce task bucket number -> output file + TaskType TaskType +} + type Coordinator struct { + Tasks []Task + Mutex sync.Mutex + ReduceBucketCount int } func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { @@ -54,7 +81,16 @@ func (c *Coordinator) Done() bool { func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} - // Your code here. + c.ReduceBucketCount = nReduce + + for _, file := range files { + fmt.Println("Adding task", file) + c.Tasks = append(c.Tasks, Task{ + TaskFile: file, + Status: Ready, + TaskType: Map, + }) + } c.server() return &c From f2fdbfe62ff191741c109dccb95f8b9da08fa7d5 Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Tue, 30 Apr 2024 22:23:24 -0400 Subject: [PATCH 04/10] Checkpoint: Modify GetTask RPC to be more realistic --- src/mr/coordinator.go | 25 +++++++++++++++---------- src/mr/rpc.go | 1 + src/mr/worker.go | 29 +++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index 96a4309..e80c181 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -3,7 +3,6 @@ package mr import ( "fmt" "log" - "strings" "sync" ) import "net" @@ -37,17 +36,25 @@ type Coordinator struct { Tasks []Task Mutex sync.Mutex ReduceBucketCount int + ReadyToReduce bool } func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { - // get all files with suffix .txt and return the first file - var files []string - for _, file := range os.Args { - if strings.HasSuffix(file, ".txt") { - files = append(files, file) + c.Mutex.Lock() + defer c.Mutex.Unlock() + + var readyTask *Task + for i, task := range c.Tasks { + if task.Status == Ready && (c.ReadyToReduce || task.TaskType == Map) { + readyTask = &c.Tasks[i] + break } } - reply.TaskFile = files[0] + + readyTask.Status = InProgress + readyTask.WorkerId = args.WorkerId + + reply.TaskFile = readyTask.TaskFile return nil } @@ -75,13 +82,11 @@ func (c *Coordinator) Done() bool { return ret } -// create a Coordinator. -// main/mrcoordinator.go calls this function. -// nReduce is the number of reduce tasks to use. func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.ReduceBucketCount = nReduce + c.ReadyToReduce = false // should be defaulted to false, but this is performed in case it isn't for _, file := range files { fmt.Println("Adding task", file) diff --git a/src/mr/rpc.go b/src/mr/rpc.go index b038860..d65ac51 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -4,6 +4,7 @@ import "os" import "strconv" type GetTaskArgs struct { + WorkerId string } type GetTaskReply struct { diff --git a/src/mr/worker.go b/src/mr/worker.go index 107b682..930d5ef 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -1,6 +1,10 @@ package mr -import "fmt" +import ( + "crypto/rand" + "encoding/base64" + "fmt" +) import "log" import "net/rpc" import "hash/fnv" @@ -19,17 +23,34 @@ func ihash(key string) int { return int(h.Sum32() & 0x7fffffff) } +type WorkerState struct { + WorkerId string +} + +var workerState WorkerState + +func init() { + workerState.WorkerId = newWorkerId() +} + +func newWorkerId() string { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + log.Fatal(err) + } + return base64.URLEncoding.EncodeToString(b) +} + // main/mrworker.go calls this function. func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { - // Your worker implementation here. - // uncomment to send the Example RPC to the coordinator. CallGetTask() } func CallGetTask() { - args := GetTaskArgs{} + args := GetTaskArgs{WorkerId: workerState.WorkerId} reply := GetTaskReply{} ok := call("Coordinator.GetTask", &args, &reply) if ok { From ca76f08287fd4313a02c38d673e6d3680364cb95 Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Wed, 1 May 2024 00:15:52 -0400 Subject: [PATCH 05/10] Checkpoint: Add ability for workers to generate intermediate files --- src/mr/coordinator.go | 5 +++ src/mr/worker.go | 90 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index e80c181..f761dc3 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -51,6 +51,11 @@ func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { } } + if readyTask == nil { + reply.TaskFile = "" + return nil + } + readyTask.Status = InProgress readyTask.WorkerId = args.WorkerId diff --git a/src/mr/worker.go b/src/mr/worker.go index 930d5ef..82e2a5b 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -4,8 +4,14 @@ import ( "crypto/rand" "encoding/base64" "fmt" + "io/ioutil" + "os" + "sort" + "strconv" + "strings" ) import "log" +import "time" import "net/rpc" import "hash/fnv" @@ -23,6 +29,13 @@ func ihash(key string) int { return int(h.Sum32() & 0x7fffffff) } +type ByKey []KeyValue + +// for sorting by key. +func (a ByKey) Len() int { return len(a) } +func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + type WorkerState struct { WorkerId string } @@ -42,14 +55,85 @@ func newWorkerId() string { return base64.URLEncoding.EncodeToString(b) } +func getFileContents(filename string) string { + file, err := os.Open(filename) + defer file.Close() + if err != nil { + log.Fatalf("cannot open %v", filename) + } + contents, err := ioutil.ReadAll(file) + if err != nil { + log.Fatalf("cannot read %v", filename) + } + file.Close() + + return string(contents) +} + // main/mrworker.go calls this function. func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { + startTime := time.Now() + currentDir, err := os.Getwd() + if err != nil { + log.Fatalf("cannot get current directory") + } + fileToProcess := CallGetTask() + + contents := getFileContents(fileToProcess) + intermediateKvpArray := mapf(fileToProcess, contents) - CallGetTask() + // sort by keys + sort.Sort(ByKey(intermediateKvpArray)) + + // create 10 temporary files with os.CreateTemp + tempBuckets := make([]*os.File, 10) + for i := 0; i < 10; i++ { + tempBuckets[i], err = os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+workerState.WorkerId+"-"+fileToProcess) + if err != nil { + log.Fatalf("cannot create temp file") + } + } + + // create groups of intermediate key value pairs for each reduce task based on the key + currentGroup := make([]KeyValue, 0) + bucketsData := make([]strings.Builder, 10) + i := 0 + for i < len(intermediateKvpArray) { + j := i + 1 + for j < len(intermediateKvpArray) && intermediateKvpArray[j].Key == intermediateKvpArray[i].Key { + j++ + } + currentGroup = append(currentGroup, intermediateKvpArray[i:j]...) + + // get all kvps into one string + var groupData strings.Builder + for _, kvp := range currentGroup { + groupData.WriteString(fmt.Sprintf("%v %v\n", kvp.Key, kvp.Value)) + } + bucketsData[ihash(intermediateKvpArray[i].Key)%10].WriteString(groupData.String()) + i = j + } + + // write to temp files + for i, tempFileContents := range bucketsData { + _, err := tempBuckets[i].WriteString(tempFileContents.String()) + if err != nil { + log.Fatalf("cannot write to temp file") + } + } + + for i, tempFile := range tempBuckets { + err := os.Rename(tempFile.Name(), currentDir+"/mr-out-"+fileToProcess+"-"+strconv.Itoa(i)) + if err != nil { + log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) + } + } + + fmt.Println("Total time: ", time.Since(startTime)) } -func CallGetTask() { +func CallGetTask() string { args := GetTaskArgs{WorkerId: workerState.WorkerId} reply := GetTaskReply{} ok := call("Coordinator.GetTask", &args, &reply) @@ -58,6 +142,8 @@ func CallGetTask() { } else { fmt.Printf("call failed!\n") } + + return reply.TaskFile } // send an RPC request to the coordinator, wait for the response. From bcb66f2aa3d0447fa71ed7209c6a97d8f5115b3d Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Wed, 1 May 2024 19:49:25 -0400 Subject: [PATCH 06/10] Checkpoint: Fast worker with JSON encodings for intermediate files rather than using stringBuilders --- src/mr/worker.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/mr/worker.go b/src/mr/worker.go index 82e2a5b..b1253d8 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -3,12 +3,12 @@ package mr import ( "crypto/rand" "encoding/base64" + "encoding/json" "fmt" "io/ioutil" "os" "sort" "strconv" - "strings" ) import "log" import "time" @@ -87,43 +87,43 @@ func Worker(mapf func(string, string) []KeyValue, sort.Sort(ByKey(intermediateKvpArray)) // create 10 temporary files with os.CreateTemp - tempBuckets := make([]*os.File, 10) + temporaryIntermediateFiles := make([]*os.File, 10) for i := 0; i < 10; i++ { - tempBuckets[i], err = os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+workerState.WorkerId+"-"+fileToProcess) + temporaryIntermediateFiles[i], err = os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+workerState.WorkerId+"-"+fileToProcess) if err != nil { log.Fatalf("cannot create temp file") } } - // create groups of intermediate key value pairs for each reduce task based on the key - currentGroup := make([]KeyValue, 0) - bucketsData := make([]strings.Builder, 10) + // bucket the kvps + bucketsData := make([][]KeyValue, 10) i := 0 for i < len(intermediateKvpArray) { j := i + 1 for j < len(intermediateKvpArray) && intermediateKvpArray[j].Key == intermediateKvpArray[i].Key { j++ } - currentGroup = append(currentGroup, intermediateKvpArray[i:j]...) - - // get all kvps into one string - var groupData strings.Builder - for _, kvp := range currentGroup { - groupData.WriteString(fmt.Sprintf("%v %v\n", kvp.Key, kvp.Value)) - } - bucketsData[ihash(intermediateKvpArray[i].Key)%10].WriteString(groupData.String()) + bucketNumber := ihash(intermediateKvpArray[i].Key) % 10 + bucketsData[bucketNumber] = append(bucketsData[bucketNumber], intermediateKvpArray[i:j]...) i = j } + fmt.Println("Finished grouping KVPs into buckets: ", time.Since(startTime)) + // write to temp files for i, tempFileContents := range bucketsData { - _, err := tempBuckets[i].WriteString(tempFileContents.String()) + // write to temp file json of kvps + enc := json.NewEncoder(temporaryIntermediateFiles[i]) + err := enc.Encode(&tempFileContents) if err != nil { - log.Fatalf("cannot write to temp file") + log.Fatalf("cannot encode json") } + temporaryIntermediateFiles[i].Close() } - for i, tempFile := range tempBuckets { + fmt.Println("Finished writing temp files: ", time.Since(startTime)) + + for i, tempFile := range temporaryIntermediateFiles { err := os.Rename(tempFile.Name(), currentDir+"/mr-out-"+fileToProcess+"-"+strconv.Itoa(i)) if err != nil { log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) From 8cf515c21edc874741c6e72a9fa171a209925d2a Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Wed, 1 May 2024 22:00:37 -0400 Subject: [PATCH 07/10] Checkpoint: Worker now processes different tasks besides only map tasks and does so in a loop --- src/mr/coordinator.go | 85 ++++++++++++++++++++++++++------------- src/mr/rpc.go | 21 +++++++++- src/mr/worker.go | 93 ++++++++++++++++++++++++++++++------------- 3 files changed, 143 insertions(+), 56 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index f761dc3..df7604c 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,6 +1,7 @@ package mr import ( + "encoding/gob" "fmt" "log" "sync" @@ -10,56 +11,89 @@ import "os" import "net/rpc" import "net/http" -type TaskStatus int -type TaskType int +type JobStatus int +type JobsType int const ( - Ready TaskStatus = iota + Ready JobStatus = iota + WaitingForMap InProgress Completed ) const ( - Map TaskType = iota + Map JobsType = iota Reduce ) -type Task struct { - TaskFile string - Status TaskStatus - WorkerId string - OutputFiles map[int]string // reduce task bucket number -> output file - TaskType TaskType +type Job struct { + BucketNumber int + JobFile string + WorkerId string + Status JobStatus + JobsType JobsType } type Coordinator struct { - Tasks []Task + Jobs []Job Mutex sync.Mutex ReduceBucketCount int - ReadyToReduce bool +} + +func init() { + gob.Register(MapTaskReply{}) + gob.Register(ReduceTaskReply{}) + gob.Register(ExitTaskReply{}) + gob.Register(WaitTaskReply{}) } func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { c.Mutex.Lock() defer c.Mutex.Unlock() - var readyTask *Task - for i, task := range c.Tasks { - if task.Status == Ready && (c.ReadyToReduce || task.TaskType == Map) { - readyTask = &c.Tasks[i] + // if all tasks are completed, tell the worker to exit + var allCompleted = true + for _, task := range c.Jobs { + if task.Status != Completed { + allCompleted = false break } } + if allCompleted { + reply.TaskReply = ExitTaskReply{} + return nil + } - if readyTask == nil { - reply.TaskFile = "" + var readyJob *Job + for i, task := range c.Jobs { + if task.Status == Ready { + readyJob = &c.Jobs[i] + break + } + } + + // if no tasks to assign, make the worker wait + if readyJob == nil { + reply.TaskReply = WaitTaskReply{} return nil } - readyTask.Status = InProgress - readyTask.WorkerId = args.WorkerId + readyJob.WorkerId = args.WorkerId + readyJob.Status = InProgress + + // switch for the readyJob's reflected type + switch readyJob.JobsType { + case Map: + reply.TaskReply = MapTaskReply{ + JobFile: readyJob.JobFile, + BucketCount: c.ReduceBucketCount, + } + case Reduce: + reply.TaskReply = ReduceTaskReply{ + BucketNumber: readyJob.BucketNumber, + } + } - reply.TaskFile = readyTask.TaskFile return nil } @@ -82,8 +116,6 @@ func (c *Coordinator) server() { func (c *Coordinator) Done() bool { ret := false - // Your code here. - return ret } @@ -91,14 +123,13 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.ReduceBucketCount = nReduce - c.ReadyToReduce = false // should be defaulted to false, but this is performed in case it isn't for _, file := range files { fmt.Println("Adding task", file) - c.Tasks = append(c.Tasks, Task{ - TaskFile: file, + c.Jobs = append(c.Jobs, Job{ + JobFile: file, Status: Ready, - TaskType: Map, + JobsType: Map, }) } diff --git a/src/mr/rpc.go b/src/mr/rpc.go index d65ac51..fc2dd8e 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -7,8 +7,27 @@ type GetTaskArgs struct { WorkerId string } +type TaskReply interface{} + +type MapTaskReply struct { + JobFile string + BucketCount int +} + +type ReduceTaskReply struct { + BucketNumber int +} + +// ExitTaskReply when the coordinator has finished the entire job, tells the worker to exit +type ExitTaskReply struct { +} + +// WaitTaskReply when there are no tasks to be done +type WaitTaskReply struct { +} + type GetTaskReply struct { - TaskFile string + TaskReply } // Cook up a unique-ish UNIX-domain socket name diff --git a/src/mr/worker.go b/src/mr/worker.go index b1253d8..644bb39 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -9,9 +9,9 @@ import ( "os" "sort" "strconv" + "time" ) import "log" -import "time" import "net/rpc" import "hash/fnv" @@ -21,7 +21,7 @@ type KeyValue struct { Value string } -// use ihash(key) % NReduce to choose the reduce +// use ihash(key) % BucketCount to choose the reduce // task number for each KeyValue emitted by Map. func ihash(key string) int { h := fnv.New32a() @@ -70,49 +70,43 @@ func getFileContents(filename string) string { return string(contents) } -// main/mrworker.go calls this function. -func Worker(mapf func(string, string) []KeyValue, - reducef func(string, []string) string) { - startTime := time.Now() +func handleMapTask(mapf func(string, string) []KeyValue, reply MapTaskReply) { currentDir, err := os.Getwd() if err != nil { log.Fatalf("cannot get current directory") } - fileToProcess := CallGetTask() + + fileToProcess := reply.JobFile + bucketsToCreate := reply.BucketCount contents := getFileContents(fileToProcess) intermediateKvpArray := mapf(fileToProcess, contents) - - // sort by keys sort.Sort(ByKey(intermediateKvpArray)) - // create 10 temporary files with os.CreateTemp - temporaryIntermediateFiles := make([]*os.File, 10) - for i := 0; i < 10; i++ { + // Create the temporary files to store results of the map phase + temporaryIntermediateFiles := make([]*os.File, bucketsToCreate) + for i := 0; i < len(temporaryIntermediateFiles); i++ { temporaryIntermediateFiles[i], err = os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+workerState.WorkerId+"-"+fileToProcess) if err != nil { log.Fatalf("cannot create temp file") } } - // bucket the kvps - bucketsData := make([][]KeyValue, 10) + // Put KVPs into respective buckets based on hash of the key + bucketsData := make([][]KeyValue, bucketsToCreate) i := 0 for i < len(intermediateKvpArray) { j := i + 1 for j < len(intermediateKvpArray) && intermediateKvpArray[j].Key == intermediateKvpArray[i].Key { j++ } - bucketNumber := ihash(intermediateKvpArray[i].Key) % 10 + bucketNumber := ihash(intermediateKvpArray[i].Key) % bucketsToCreate bucketsData[bucketNumber] = append(bucketsData[bucketNumber], intermediateKvpArray[i:j]...) i = j } - fmt.Println("Finished grouping KVPs into buckets: ", time.Since(startTime)) - - // write to temp files + // Write JSON intermediate KVPs to temp files for i, tempFileContents := range bucketsData { - // write to temp file json of kvps enc := json.NewEncoder(temporaryIntermediateFiles[i]) err := enc.Encode(&tempFileContents) if err != nil { @@ -121,29 +115,72 @@ func Worker(mapf func(string, string) []KeyValue, temporaryIntermediateFiles[i].Close() } - fmt.Println("Finished writing temp files: ", time.Since(startTime)) - + // Atomic rename temp files to final files for i, tempFile := range temporaryIntermediateFiles { err := os.Rename(tempFile.Name(), currentDir+"/mr-out-"+fileToProcess+"-"+strconv.Itoa(i)) if err != nil { log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) } } +} + +func handleExitTask() { + os.Exit(0) +} - fmt.Println("Total time: ", time.Since(startTime)) +func handleWaitTask() { + time.Sleep(time.Second) } -func CallGetTask() string { +func handleReduceTask(reducef func(string, []string) string, reply ReduceTaskReply) { + panic("Not implemented reduce handler yet") +} + +// main/mrworker.go calls this function. +func Worker(mapf func(string, string) []KeyValue, + reducef func(string, []string) string) { + for { + assignedTask := CallGetTask() + + switch assignedTask.(type) { + case MapTaskReply: + fmt.Println("Map task assigned") + handleMapTask(mapf, assignedTask.(MapTaskReply)) + fmt.Println("Map task completed") + case ReduceTaskReply: + fmt.Println("Reduce task assigned") + handleReduceTask(reducef, assignedTask.(ReduceTaskReply)) + fmt.Println("Reduce task completed") + case ExitTaskReply: + fmt.Println("Exit task assigned") + handleExitTask() + case WaitTaskReply: + fmt.Println("Wait task assigned") + handleWaitTask() + } + } +} + +func CallGetTask() TaskReply { args := GetTaskArgs{WorkerId: workerState.WorkerId} reply := GetTaskReply{} ok := call("Coordinator.GetTask", &args, &reply) - if ok { - fmt.Printf("reply.TaskFile %v\n", reply.TaskFile) - } else { - fmt.Printf("call failed!\n") + if !ok { + panic("Failed to get task") } - return reply.TaskFile + switch reply.TaskReply.(type) { + case MapTaskReply: + return reply.TaskReply.(MapTaskReply) + case ReduceTaskReply: + return reply.TaskReply.(ReduceTaskReply) + case ExitTaskReply: + return reply.TaskReply.(ExitTaskReply) + case WaitTaskReply: + return reply.TaskReply.(WaitTaskReply) + default: + panic("Unknown task type") + } } // send an RPC request to the coordinator, wait for the response. From 92ff47608f44043b8a09c49e8f649d2ffad04671 Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Wed, 1 May 2024 22:46:01 -0400 Subject: [PATCH 08/10] Checkpoint: Add new RPC for signaling to master that job has finished and adjust handlers on master and make call on client --- src/mr/coordinator.go | 81 +++++++++++++++++++++++++++++++++++++++++-- src/mr/rpc.go | 10 ++++++ src/mr/worker.go | 15 ++++++-- 3 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index df7604c..83957d3 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,6 +1,8 @@ package mr import ( + "crypto/rand" + "encoding/base64" "encoding/gob" "fmt" "log" @@ -27,6 +29,7 @@ const ( ) type Job struct { + Id string BucketNumber int JobFile string WorkerId string @@ -87,10 +90,53 @@ func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { reply.TaskReply = MapTaskReply{ JobFile: readyJob.JobFile, BucketCount: c.ReduceBucketCount, + JobId: readyJob.Id, } case Reduce: reply.TaskReply = ReduceTaskReply{ BucketNumber: readyJob.BucketNumber, + JobFile: readyJob.JobFile, + JobId: readyJob.Id, + } + } + + return nil +} + +func (c *Coordinator) JobFinish(args *JobFinishArgs, reply *JobFinishReply) error { + c.Mutex.Lock() + defer c.Mutex.Unlock() + + for i, job := range c.Jobs { + if job.Id == args.JobId { + // if the recently completed job is a map job and all map jobs are now complete, mark reduce jobs as ready + if job.Status == Completed { + break + } + + if job.JobsType == Map { + var allMapJobsCompleted = true + for _, mapJob := range c.Jobs { + if mapJob.Id == job.Id { + continue + } + + if mapJob.JobsType == Map && mapJob.Status != Completed { + allMapJobsCompleted = false + break + } + } + + if allMapJobsCompleted { + for j, reduceJob := range c.Jobs { + if reduceJob.JobsType == Reduce && reduceJob.Status == WaitingForMap { + c.Jobs[j].Status = Ready + } + } + } + } + c.Jobs[i].Status = Completed + break } } @@ -111,12 +157,30 @@ func (c *Coordinator) server() { go http.Serve(l, nil) } +func newJobId() string { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + log.Fatal(err) + } + return base64.URLEncoding.EncodeToString(b) +} + // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. func (c *Coordinator) Done() bool { - ret := false + c.Mutex.Lock() + defer c.Mutex.Unlock() - return ret + allCompleted := true + for _, task := range c.Jobs { + if task.Status != Completed { + allCompleted = false + break + } + } + + return allCompleted } func MakeCoordinator(files []string, nReduce int) *Coordinator { @@ -125,14 +189,25 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { c.ReduceBucketCount = nReduce for _, file := range files { - fmt.Println("Adding task", file) + fmt.Println("Adding map job for ", file) c.Jobs = append(c.Jobs, Job{ + Id: newJobId(), JobFile: file, Status: Ready, JobsType: Map, }) } + for bucket := 0; bucket < nReduce; bucket++ { + fmt.Println("Adding reduce job for bucket ", bucket) + c.Jobs = append(c.Jobs, Job{ + Id: newJobId(), + BucketNumber: bucket, + Status: WaitingForMap, + JobsType: Reduce, + }) + } + c.server() return &c } diff --git a/src/mr/rpc.go b/src/mr/rpc.go index fc2dd8e..4cc7fa9 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -10,12 +10,15 @@ type GetTaskArgs struct { type TaskReply interface{} type MapTaskReply struct { + JobId string JobFile string BucketCount int } type ReduceTaskReply struct { BucketNumber int + JobId string + JobFile string } // ExitTaskReply when the coordinator has finished the entire job, tells the worker to exit @@ -30,6 +33,13 @@ type GetTaskReply struct { TaskReply } +type JobFinishArgs struct { + JobId string +} + +type JobFinishReply struct { +} + // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Can't use the current directory since diff --git a/src/mr/worker.go b/src/mr/worker.go index 644bb39..1e3edc5 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -144,13 +144,15 @@ func Worker(mapf func(string, string) []KeyValue, switch assignedTask.(type) { case MapTaskReply: - fmt.Println("Map task assigned") + fmt.Println("Map task assigned, file: ", assignedTask.(MapTaskReply).JobFile) handleMapTask(mapf, assignedTask.(MapTaskReply)) fmt.Println("Map task completed") + CallJobFinish(assignedTask.(MapTaskReply).JobId) case ReduceTaskReply: - fmt.Println("Reduce task assigned") + fmt.Println("Reduce task assigned, bucket: ", assignedTask.(ReduceTaskReply).BucketNumber) handleReduceTask(reducef, assignedTask.(ReduceTaskReply)) fmt.Println("Reduce task completed") + CallJobFinish(assignedTask.(ReduceTaskReply).JobId) case ExitTaskReply: fmt.Println("Exit task assigned") handleExitTask() @@ -161,6 +163,15 @@ func Worker(mapf func(string, string) []KeyValue, } } +func CallJobFinish(jobId string) { + args := JobFinishArgs{JobId: jobId} + reply := JobFinishReply{} + ok := call("Coordinator.JobFinish", &args, &reply) + if !ok { + panic("Failed to finish job") + } +} + func CallGetTask() TaskReply { args := GetTaskArgs{WorkerId: workerState.WorkerId} reply := GetTaskReply{} From 0d54fca12b00604ba4171caeb8f7ac7055f6a45e Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Wed, 1 May 2024 23:26:06 -0400 Subject: [PATCH 09/10] Checkpoint: Add reduce handler --- src/mr/coordinator.go | 1 - src/mr/rpc.go | 1 - src/mr/worker.go | 79 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index 83957d3..4139552 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -95,7 +95,6 @@ func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { case Reduce: reply.TaskReply = ReduceTaskReply{ BucketNumber: readyJob.BucketNumber, - JobFile: readyJob.JobFile, JobId: readyJob.Id, } } diff --git a/src/mr/rpc.go b/src/mr/rpc.go index 4cc7fa9..94c21b5 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -18,7 +18,6 @@ type MapTaskReply struct { type ReduceTaskReply struct { BucketNumber int JobId string - JobFile string } // ExitTaskReply when the coordinator has finished the entire job, tells the worker to exit diff --git a/src/mr/worker.go b/src/mr/worker.go index 1e3edc5..1980889 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -7,8 +7,10 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "sort" "strconv" + "strings" "time" ) import "log" @@ -117,7 +119,14 @@ func handleMapTask(mapf func(string, string) []KeyValue, reply MapTaskReply) { // Atomic rename temp files to final files for i, tempFile := range temporaryIntermediateFiles { - err := os.Rename(tempFile.Name(), currentDir+"/mr-out-"+fileToProcess+"-"+strconv.Itoa(i)) + // Create the directory if it doesn't already exist + finalFileDir := filepath.Dir(filepath.Join(currentDir, "intermediate", fileToProcess, strconv.Itoa(i))) + err := os.MkdirAll(finalFileDir, 0755) + if err != nil { + log.Fatalf("cannot create intermediate directory") + } + + err = os.Rename(tempFile.Name(), currentDir+"/intermediate/"+fileToProcess+"/"+strconv.Itoa(i)) if err != nil { log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) } @@ -133,7 +142,73 @@ func handleWaitTask() { } func handleReduceTask(reducef func(string, []string) string, reply ReduceTaskReply) { - panic("Not implemented reduce handler yet") + intermediateKvps := make([]KeyValue, 0) + + currentDir, err := os.Getwd() + if err != nil { + log.Fatalf("cannot get current directory") + } + + intermediateDir := filepath.Join(currentDir, "intermediate") + desiredFile := strconv.Itoa(reply.BucketNumber) + err = filepath.Walk(intermediateDir, func(path string, info os.FileInfo, err error) error { + if !info.IsDir() && strings.HasSuffix(path, desiredFile) { + fmt.Println("Reading KVPs from intermediate file: ", path) + file, err := os.Open(path) + if err != nil { + log.Fatalf("cannot open %v", path) + } + + dec := json.NewDecoder(file) + for { + var kvps []KeyValue + if err := dec.Decode(&kvps); err != nil { + break + } + intermediateKvps = append(intermediateKvps, kvps...) + } + + file.Close() + } + return nil + }) + + // sort the intermediate kvps + sort.Sort(ByKey(intermediateKvps)) + + // create a temporary file to store the reduce output + tempFile, err := os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+strconv.Itoa(reply.BucketNumber)) + if err != nil { + log.Fatalf("cannot create temp file") + } + + // call Reduce on each distinct key in intermediate[], and print the result to temp file + i := 0 + for i < len(intermediateKvps) { + j := i + 1 + for j < len(intermediateKvps) && intermediateKvps[j].Key == intermediateKvps[i].Key { + j++ + } + var values []string + for k := i; k < j; k++ { + values = append(values, intermediateKvps[k].Value) + } + output := reducef(intermediateKvps[i].Key, values) + fmt.Fprintf(tempFile, "%v %v\n", intermediateKvps[i].Key, output) + i = j + } + + tempFile.Close() + + fmt.Println("Writing temporary reduce output to file: ", tempFile.Name()) + + // Atomic rename temp file to final file + err = os.Rename(tempFile.Name(), currentDir+"/mr-out-"+strconv.Itoa(reply.BucketNumber)) + if err != nil { + log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) + } + + fmt.Println("Reduce output written to file: ", currentDir+"/mr-out-"+strconv.Itoa(reply.BucketNumber)) } // main/mrworker.go calls this function. From 83a7947d6449fd0d8c6ae10fb17ecf7afeeed7a7 Mon Sep 17 00:00:00 2001 From: mnnaegel Date: Thu, 2 May 2024 13:45:24 -0400 Subject: [PATCH 10/10] =?UTF-8?q?Tests=20pass=20(=E2=9C=A6=20=E2=80=BF=20?= =?UTF-8?q?=E2=9C=A6)...=20Make=20program=20agnostic=20to=20working=20dire?= =?UTF-8?q?ctory=20to=20support=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mr/coordinator.go | 26 +++++++++++++-- src/mr/worker.go | 73 ++++++++++++++++++++++++++++++++----------- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index 4139552..91ec6e1 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "sync" + "time" ) import "net" import "os" @@ -35,6 +36,7 @@ type Job struct { WorkerId string Status JobStatus JobsType JobsType + StartTime time.Time } type Coordinator struct { @@ -83,6 +85,7 @@ func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error { readyJob.WorkerId = args.WorkerId readyJob.Status = InProgress + readyJob.StartTime = time.Now() // switch for the readyJob's reflected type switch readyJob.JobsType { @@ -182,13 +185,31 @@ func (c *Coordinator) Done() bool { return allCompleted } +func (c *Coordinator) CleanLongRunningTasks() { + clean := func() { + c.Mutex.Lock() + defer c.Mutex.Unlock() + + for i, task := range c.Jobs { + if task.Status == InProgress && time.Since(task.StartTime).Seconds() > 10 { + c.Jobs[i].Status = Ready + } + } + } + + for { + clean() + time.Sleep(time.Second) + } +} + func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.ReduceBucketCount = nReduce + fmt.Println("Coordinator: Adding map jobs") for _, file := range files { - fmt.Println("Adding map job for ", file) c.Jobs = append(c.Jobs, Job{ Id: newJobId(), JobFile: file, @@ -197,8 +218,8 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { }) } + fmt.Println("Coordinator: Adding reduce jobs") for bucket := 0; bucket < nReduce; bucket++ { - fmt.Println("Adding reduce job for bucket ", bucket) c.Jobs = append(c.Jobs, Job{ Id: newJobId(), BucketNumber: bucket, @@ -207,6 +228,7 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { }) } + go c.CleanLongRunningTasks() c.server() return &c } diff --git a/src/mr/worker.go b/src/mr/worker.go index 1980889..182c0e7 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -1,6 +1,7 @@ package mr import ( + "context" "crypto/rand" "encoding/base64" "encoding/json" @@ -75,7 +76,7 @@ func getFileContents(filename string) string { func handleMapTask(mapf func(string, string) []KeyValue, reply MapTaskReply) { currentDir, err := os.Getwd() if err != nil { - log.Fatalf("cannot get current directory") + log.Fatalf("cannot get current directory, error: %v", err) } fileToProcess := reply.JobFile @@ -85,12 +86,23 @@ func handleMapTask(mapf func(string, string) []KeyValue, reply MapTaskReply) { intermediateKvpArray := mapf(fileToProcess, contents) sort.Sort(ByKey(intermediateKvpArray)) + // create tmp directory if it doesn't exist + tmpDir := filepath.Join(currentDir, "tmp") + err = os.MkdirAll(tmpDir, 0755) + if err != nil { + log.Fatalf("cannot create tmp directory, error: %v", err) + } + // Create the temporary files to store results of the map phase temporaryIntermediateFiles := make([]*os.File, bucketsToCreate) + fmt.Println("Sanity check... first key and value: ", intermediateKvpArray[0].Key, intermediateKvpArray[0].Value) for i := 0; i < len(temporaryIntermediateFiles); i++ { - temporaryIntermediateFiles[i], err = os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+workerState.WorkerId+"-"+fileToProcess) + newFile, err := os.CreateTemp(tmpDir, "mr-tmp-*") + // print file path + fmt.Println("Temp file created: ", newFile.Name()) + temporaryIntermediateFiles[i] = newFile if err != nil { - log.Fatalf("cannot create temp file") + log.Fatalf("cannot create temp file, error: %v", err) } } @@ -120,13 +132,18 @@ func handleMapTask(mapf func(string, string) []KeyValue, reply MapTaskReply) { // Atomic rename temp files to final files for i, tempFile := range temporaryIntermediateFiles { // Create the directory if it doesn't already exist - finalFileDir := filepath.Dir(filepath.Join(currentDir, "intermediate", fileToProcess, strconv.Itoa(i))) + // filetoprocess can be in format ../filename.txt just get the final file name + fileName := filepath.Base(fileToProcess) + finalFilePath := filepath.Join(currentDir, "intermediate", fileName, strconv.Itoa(i)) + finalFileDir := filepath.Dir(finalFilePath) + fmt.Println("Final file path: ", finalFilePath) + fmt.Println("Final file dir: ", finalFileDir) err := os.MkdirAll(finalFileDir, 0755) if err != nil { - log.Fatalf("cannot create intermediate directory") + log.Fatalf("cannot create intermediate directory, error: %v", err) } - err = os.Rename(tempFile.Name(), currentDir+"/intermediate/"+fileToProcess+"/"+strconv.Itoa(i)) + err = os.Rename(tempFile.Name(), finalFilePath) if err != nil { log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) } @@ -142,18 +159,28 @@ func handleWaitTask() { } func handleReduceTask(reducef func(string, []string) string, reply ReduceTaskReply) { + homeDir, err := os.Getwd() + if err != nil { + log.Fatalf("cannot get current directory, error: %v", err) + } + intermediateKvps := make([]KeyValue, 0) - currentDir, err := os.Getwd() + currentDir := homeDir + + // create intermediate directory if it doesn't exist + err = os.MkdirAll(filepath.Join(currentDir, "intermediate"), 0755) if err != nil { - log.Fatalf("cannot get current directory") + log.Fatalf("cannot create intermediate directory, error: %v", err) } intermediateDir := filepath.Join(currentDir, "intermediate") desiredFile := strconv.Itoa(reply.BucketNumber) err = filepath.Walk(intermediateDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Fatalf("cannot walk %v, error: %v", path, err) + } if !info.IsDir() && strings.HasSuffix(path, desiredFile) { - fmt.Println("Reading KVPs from intermediate file: ", path) file, err := os.Open(path) if err != nil { log.Fatalf("cannot open %v", path) @@ -176,8 +203,9 @@ func handleReduceTask(reducef func(string, []string) string, reply ReduceTaskRep // sort the intermediate kvps sort.Sort(ByKey(intermediateKvps)) + tmpDir := filepath.Join(currentDir, "tmp") // create a temporary file to store the reduce output - tempFile, err := os.CreateTemp(currentDir+"/tmp", "mr-tmp-"+strconv.Itoa(reply.BucketNumber)) + tempFile, err := os.CreateTemp(tmpDir, "mr-out-*") if err != nil { log.Fatalf("cannot create temp file") } @@ -200,14 +228,17 @@ func handleReduceTask(reducef func(string, []string) string, reply ReduceTaskRep tempFile.Close() - fmt.Println("Writing temporary reduce output to file: ", tempFile.Name()) - // Atomic rename temp file to final file err = os.Rename(tempFile.Name(), currentDir+"/mr-out-"+strconv.Itoa(reply.BucketNumber)) if err != nil { log.Fatalf("Failed renaming temp file %v, error: %v", tempFile.Name(), err) } + err = os.Chmod(currentDir+"/mr-out-"+strconv.Itoa(reply.BucketNumber), 0644) + if err != nil { + log.Fatalf("Failed to change file permissions, error: %v", err) + } + fmt.Println("Reduce output written to file: ", currentDir+"/mr-out-"+strconv.Itoa(reply.BucketNumber)) } @@ -221,12 +252,10 @@ func Worker(mapf func(string, string) []KeyValue, case MapTaskReply: fmt.Println("Map task assigned, file: ", assignedTask.(MapTaskReply).JobFile) handleMapTask(mapf, assignedTask.(MapTaskReply)) - fmt.Println("Map task completed") CallJobFinish(assignedTask.(MapTaskReply).JobId) case ReduceTaskReply: fmt.Println("Reduce task assigned, bucket: ", assignedTask.(ReduceTaskReply).BucketNumber) handleReduceTask(reducef, assignedTask.(ReduceTaskReply)) - fmt.Println("Reduce task completed") CallJobFinish(assignedTask.(ReduceTaskReply).JobId) case ExitTaskReply: fmt.Println("Exit task assigned") @@ -274,6 +303,8 @@ func CallGetTask() TaskReply { // returns false if something goes wrong. func call(rpcname string, args interface{}, reply interface{}) bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() sockname := coordinatorSock() c, err := rpc.DialHTTP("unix", sockname) if err != nil { @@ -281,11 +312,15 @@ func call(rpcname string, args interface{}, reply interface{}) bool { } defer c.Close() - err = c.Call(rpcname, args, reply) - if err == nil { + // Make the RPC call with the context + callch := c.Go(rpcname, args, reply, nil) + select { + case <-ctx.Done(): + return false + case <-callch.Done: + if callch.Error != nil { + return false + } return true } - - fmt.Println(err) - return false }