Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pkg/
api.key
*-handin.tar.gz
*.so
mr-tmp/
lecture2/
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# mit-6.824
https://pdos.csail.mit.edu/6.824/schedule.html

See pull requests for changes in each lab.
1 change: 1 addition & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mrtmp.*
/pbservice/x.txt
/kvpaxos/x.txt
*.so
mr-*
205 changes: 190 additions & 15 deletions src/mr/master.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,183 @@
package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"strings"
"sync"
"time"
)

const unassigned string = "unassigned"
const completed string = "completed"

type Assigned struct {
WorkerId string
AssignedTime time.Time
}

type Task struct {
Id string
Type string // "map" or "reduce" TODO: enum
Content []string // for map: it's input file name; for reduce: initially it's empty list, then it will be all intermediatery files
Status interface{} // "unassigned/completed" or Assigned
}

func (task Task) isCompleted() bool {
s, ok := task.Status.(string)
if ok {
return s == completed
} else {
return false
}
}

func (task Task) toGetTaskResponse(response *GetTaskResponse) {
response.TaskId = task.Id
response.TaskType = task.Type
response.TaskContent = task.Content
}

type Master struct {
// Your definitions here.
MapTasks map[string]*Task
ReduceTasks map[string]*Task
nReduce int
mux sync.Mutex
}

func updateStatus(m map[string]*Task) bool {
// return whether everything is completed
completed := true
now := time.Now()
var tolerance float64 = 10

for k, v := range m {
task := *v

completed = completed && task.isCompleted()
// check if an Assigned date is more than 10 seconds old,
fmt.Println("%s %T", v.Id, task.Status)
t, ok := v.Status.(Assigned)
if ok {
if now.Sub(t.AssignedTime).Seconds() > tolerance {
fmt.Printf("Task %s is more than %.f seconds old, assigned time %s\n", k, tolerance, t.AssignedTime)
m[k] = &Task{task.Id, task.Type, task.Content, unassigned}
}
}
}

return completed

}

// map collection functions
func find(m map[string]*Task, f func(Task) bool) (*Task, error) {
for _, v := range m {
task := *v
if f(task) {
return v, nil
}
}
return &Task{}, errors.New("No applicable task found")
}

func all(m map[string]*Task, f func(Task) bool) bool {
result := true
for _, v := range m {
result = result && f(*v)
}
return result
}

func isStatus(task Task, status string) bool {
s, ok := task.Status.(string)
return ok && s == status
}

// Your code here -- RPC handlers for the worker to call.

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
func (m *Master) SubmitTask(args *SubmitTaskRequest, reply *SubmitTaskResponse) error {
fmt.Printf("Got task submit request %s %s\n", args.TaskId, args.Files)
mapTask, ok := m.MapTasks[args.TaskId]
if ok {
fmt.Printf("Submit map task %s\n", args.TaskId)
if mapTask.Status != completed {
m.mux.Lock()
for _, fname := range args.Files {
t := strings.Split(fname, "-")
reduceTaskId := t[2]
reduceTask := m.ReduceTasks[reduceTaskId]
reduceTask.Content = append(reduceTask.Content, fname)
}
mapTask.Status = completed
m.mux.Unlock()
}
}
reduceTask, ok := m.ReduceTasks[args.TaskId]
if ok && reduceTask.Status != completed {
fmt.Printf("Submit reduce task %s\n", args.TaskId)
m.mux.Lock()
reduceTask.Status = completed
m.mux.Unlock()
}

reply.Msg = "ok"
return nil
}

func (m *Master) GetTask(args *GetTaskRequest, reply *GetTaskResponse) error {
workerId := args.WorkerId
fmt.Printf("Got GetTask request from worker %s\n", workerId)
now := time.Now()
unassignedF := func(task Task) bool {
return isStatus(task, unassigned)
}

completedF := func(task Task) bool {
return isStatus(task, completed)
}

if all(m.MapTasks, completedF) && all(m.ReduceTasks, completedF) {
reply.Err = AllTasksComplete
return nil
}

mapTask, err := find(m.MapTasks, unassignedF)
if err != nil {
// only proceed to reduce tasks if all map tasks are completed
if all(m.MapTasks, func(task Task) bool { return task.Status == completed }) {
reduceTask, err2 := find(m.ReduceTasks, unassignedF)
if err2 != nil {
reply.Err = NoTaskAvailable
} else {
reduceTask.toGetTaskResponse(reply)
reply.NReduce = m.nReduce
m.mux.Lock()
reduceTask.Status = Assigned{workerId, now}
m.mux.Unlock()
}

} else {
reply.Err = NoTaskAvailable
}

} else {
mapTask.toGetTaskResponse(reply)
reply.NReduce = m.nReduce
m.mux.Lock()
mapTask.Status = Assigned{workerId, now}
m.mux.Unlock()
}

return nil

}

//
// start a thread that listens for RPCs from worker.go
Expand All @@ -46,10 +200,16 @@ func (m *Master) server() {
// if the entire job has finished.
//
func (m *Master) Done() bool {
ret := false
// Done is called every second, so we can use this to check the task status and reassign
// go through every task in MapTasks and ReduceTasks

// Your code here.
m.mux.Lock()
ret := updateStatus(m.MapTasks) && updateStatus(m.ReduceTasks)
m.mux.Unlock()

fmt.Printf("Done checking result %b\n", ret)

// Your code here.

return ret
}
Expand All @@ -61,9 +221,24 @@ func (m *Master) Done() bool {
//
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
// create map tasks
m.MapTasks = map[string]*Task{}
m.ReduceTasks = map[string]*Task{}
m.nReduce = nReduce

for i, fname := range files {
taskId := fmt.Sprintf("map%d", i)
m.MapTasks[taskId] = &Task{taskId, mapTask, []string{fname}, unassigned}
}

// Your code here.
for i := 0; i < nReduce; i++ {
taskId := fmt.Sprintf("%d", i)
m.ReduceTasks[taskId] = &Task{taskId, reduceTask, []string{}, unassigned}
}

fmt.Printf("Master initialized, map tasks total %d, reduce tasks total %d\n", len(m.MapTasks), len(m.ReduceTasks))

// Your code here.

m.server()
return &m
Expand Down
41 changes: 30 additions & 11 deletions src/mr/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,43 @@ package mr
// remember to capitalize all names.
//

import "os"
import "strconv"
import (
"os"
"strconv"
)

//
// example to show how to declare the arguments
// and reply for an RPC.
//
// RPC types for mapreduce

type ExampleArgs struct {
X int
type GetTaskRequest struct {
WorkerId string
}

type ExampleReply struct {
Y int
type GetTaskResponse struct {
TaskId string
TaskType string
TaskContent []string // for map: single filename; for reduce: a list of intermediate file names
NReduce int // TODO: separate a "getMeta" RPC call?
Err string // TODO: make it more elegant?
}

// Add your RPC definitions here.
type SubmitTaskRequest struct {
TaskId string
Files []string
}

type SubmitTaskResponse struct {
Msg string
}

const mapTask = "map"
const reduceTask = "reduce"

// errors

const NoTaskAvailable = "No task currently available"
const AllTasksComplete = "All tasks have completed"

// Add your RPC definitions here.

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
Expand Down
Loading