要求简介
实现 mrsequential.go 中的单词计数的分布式版本(其实只是单机跑)
要求跟论文2.1的单词统计挺像的,就是论文里是master分配任务,我们的程序要求worker去向coordinator要任务
整体框架图
再看一下生成上述文件名的四个函数:
Copy // 生成临时 map file name
func genTmpMapName(wid, ti, r int) string {
return fmt.Sprintf("tmp-worker-%d-%d-%d", wid, ti, r)
}
// 生成最终map 输出文件
func genFinalMapName(taskIndex, n int) string {
return fmt.Sprintf("mr-%d-%d",taskIndex,n)
}
// 生成临时 reduce file name
func genTmpReduceName(wid, ti int) string {
return fmt.Sprintf("tmp-worker-%d-out-%d", wid, ti)
}
// s
func genFinaReduceName(index int) string {
return fmt.Sprintf("mr-out-%d",index)
}
开始
按lab1题目给我们的提示是:一个开始的方法是修改mr/worker.go的Worker(),向协调器发送一个要求任务的RPC。然后修改协调器,使其回应一个尚未启动的map任务的文件名。然后修改Worker以读取该文件并调用应用程序的Map函数,如mrsequential.go。
从提示和对整个mr包的阅读我们容易看出mr.Worker()是本次lab的着手点,因为tesh-mr.sh里面是启动了2-3个worker来测试,所有我们需要区分不同的终端产生的请求。并且由于worker需要不断向coordinator发送rpc请求任务,所有我们需要套一个死循环来请求rpc。
我们在请求rpc的时候需要拿取任务,也需要告知coordinator我们完成了哪些任务,方便coordinator的状态转换,这里有多种方法可以实现,像:
在这次请求时带上上一次请求执行的信息,告知coordinator我上次请求已经完成了
在完成时额外发送一个rpc,告知coordinator我这次请求已经完成了
因此可以大概写出worker函数的框架:
Copy func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
wid := os.Getpid()
log.Printf("worker %v started", wid)
var lastTaskIndex int
var lastTaskType string
// Your worker implementation here.
// worker 需要不断向coordinator要任务
for {
// rpc请求
args := AskTaskArgs{
WorkerPID: wid,
LastTaskIndex: lastTaskIndex,
LastTaskType: lastTaskType,
}
reply := AskTaskReply{}
call("Coordinator.AskForTask", &args, &reply)
// mr任务均完成,可以退出
if reply.Stage == "" {
log.Printf("所有任务均已完成")
break
}
if reply.TaskType == MapTask {
// 如果coordinator返回的是Map任务
} else if reply.TaskType == ReduceTask {
// 如果coordinator返回的是reduce任务
}
// 让rpc携带上次请求的任务信息
lastTaskType = reply.TaskType
lastTaskIndex = reply.TaskIndex
}
}
由上述的整体框架图,我们可知整个mr过程分为四个step
在step1开始前,coordinator需要有分配任务的阶段,这一阶段可在rpc请求中直接完成:
Copy func (c *Coordinator) AskForTask(args *AskTaskArgs, reply *AskTaskReply) error {
// 说明上一个是有任务的
if args.LastTaskType != "" {
c.mu.Lock()
// 处理map和reduce任务
c.mu.Unlock()
}
// 分配任务
task,ok := <- c.cn
if !ok {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
// 可能会有多个 worker同时来要任务
log.Printf("分配%v任务%v给worker %v",task.TaskType,task.TaskIndex,args.WorkerPID)
// 设置超时时间
task.Deadline = time.Now().Add(10*time.Second)
task.WorkerPID = args.WorkerPID
// 上面更改了,所以要重新写上去
c.tasks[genTaskKey(task.TaskIndex,task.TaskType)] = task
// 封装reply信息
reply.TaskType = task.TaskType
reply.TaskIndex = task.TaskIndex
reply.NReduce = c.NReduce
reply.NMap = c.NMap
reply.File = task.FileName
reply.Stage = c.stage
return nil
}
Step 1
step 1 是worker生成 map临时文件,步骤是:
worker向coordinator发送rpc请求,获取到任务,回复内容如下:
Copy type AskTaskReply struct {
Stage string // 状态
File string // 文件名
NReduce int // NReduce,reduce数
NMap int // 任务数
TaskIndex int // 任务索引
TaskType string // 任务类型
}
根据 reply.File 打开我们需要读取的文件
根据 ihash(val.Key) % reply.NReduce 写入临时文件
实现如下:
Copy if reply.TaskType == MapTask {
// 生成临时的文件名 tmp-worker-workerID-TaskIndex-_NReduce
fileName := reply.File
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("open file %v err:%v", fileName, err)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v err:%v", fileName, err)
}
file.Close()
kva := mapf(fileName, string(content))
// 写进临时文件,按key的ihash进行分桶
hashed := make(map[int][]KeyValue)
for _, val := range kva {
key := ihash(val.Key) % reply.NReduce
hashed[key] = append(hashed[key], val)
}
// 写入中间文件
for j := 0; j < reply.NReduce; j++ {
name := genTmpMapName(wid, reply.TaskIndex, j)
mapfile, err := os.Create(name)
if err != nil {
log.Fatalf("create file %v err:%v", name, err)
}
for _, val := range hashed[j] {
fmt.Fprintf(mapfile, "%v\t%v\n", val.Key, val.Value)
}
mapfile.Close()
}
}
Step 2
step 2 是若worker带过来的rpc请求里有上一个TaskInfo,则说明上一个map任务已经完成,这时候就可以原子的重命名这些临时文件
coordinator的操作比较简单:
Copy // 说明上一个是有任务的
if args.LastTaskType != "" {
c.mu.Lock()
taskKey := genTaskKey(args.LastTaskIndex, args.LastTaskType)
if t,exist := c.tasks[taskKey];exist && t.WorkerPID == args.WorkerPID {
log.Printf(
"%s任务%d 在 %d 上完成\n",
t.TaskType, t.TaskIndex, args.WorkerPID)
if args.LastTaskType == MapTask {
// 如果是map任务,且属于这个主机
for i := 0; i < c.NReduce; i++ {
// 忽略掉workerID合并成 NReduce*TaskNum 个文件
err := os.Rename(genTmpMapName(args.WorkerPID, args.LastTaskIndex, i),
genFinalMapName(args.LastTaskIndex, i))
if err != nil {
log.Fatalf("os rename %v error:%v",
genTmpMapName(args.WorkerPID, args.LastTaskIndex, i), err)
}
}
} else if args.LastTaskType == ReduceTask {
}
// 删除现在的task
delete(c.tasks,taskKey)
// 如果没任务了就需要更改状态了
if len(c.tasks) == 0 {
c.transition(c.NMap,c.NReduce)
}
}
c.mu.Unlock()
}
step 3
step 3 是在map任务已经全部完成的前提下,worker开始处理reduce任务,这里涉及到状态转换:
Copy func (c *Coordinator) transition(files,nReduce int) {
// debug
if c.stage == MapTask {
log.Println("开始分配reduce任务")
c.stage = ReduceTask
// 将Reduce任务加进去
for i := 0;i < c.NReduce;i++ {
task := Task{
TaskType: ReduceTask,
// 0~NReudce-1
TaskIndex: i,
}
c.tasks[genTaskKey(task.TaskIndex,task.TaskType)] = task
c.cn <- task
}
}else if c.stage == ReduceTask {
log.Printf("mr结束,准备退出")
close(c.cn)
c.stage = ""
}
}
接着便是worker对reduce任务的处理:
Copy else if reply.TaskType == ReduceTask {
var contents []string
for i := 0; i < reply.NMap; i++ {
// 这个循环意在将NReduce*len(files)个文件重新分成NReduce个文件
// 即分为len(files)个tmpReduceFile
// i的范围是0~7,代表输入文件的数量,或者说Map任务的数量
// reply.TaskInfo.TaskIndex是0~9,代表Reduce任务的数量
// 这里把前面多个主机完成的Map任务给汇总起来
// Map任务的实现是通过ihash来选reduce,所以这里合并所有map产生的文件
// 变成NReduce个文件,每个reduce一份文件
fname := genFinalMapName(i, reply.TaskIndex)
file, err := os.Open(fname)
if err != nil {
log.Fatalf("open %v err:%v", fname, err)
}
// 读文件
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("read file %s err:%v", fname, err)
}
contents = append(contents, strings.Split(string(content), "\n")...)
}
var kva []KeyValue
for _, content := range contents {
// 如果一行为空,就不算单词
if strings.TrimSpace(content) == "" {
continue
}
s := strings.Split(content, "\t")
kva = append(kva, KeyValue{
Key: s[0],
Value: s[1],
})
}
// 排序
sort.Sort(ByKey(kva))
// 写入真正的reduce 临时文件
fname := genTmpReduceName(wid, reply.TaskIndex)
file, err := os.Create(fname)
if err != nil {
log.Fatalf("create file %v err:%v", fname, err)
}
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
fmt.Fprintf(file, "%v %v\n", kva[i].Key, output)
i = j
}
file.Close()
}
step 4
step 4 是处理worker完成的reduce任务,比较简单,不多说:
Copy func (c *Coordinator) AskForTask(args *AskTaskArgs, reply *AskTaskReply) error {
// 说明上一个是有任务的
if args.LastTaskType != "" {
c.mu.Lock()
taskKey := genTaskKey(args.LastTaskIndex, args.LastTaskType)
if t,exist := c.tasks[taskKey];exist && t.WorkerPID == args.WorkerPID {
...
} else if args.LastTaskType == ReduceTask {
// 如果是reduce任务,且属于这个主机
err := os.Rename(genTmpReduceName(args.WorkerPID, args.LastTaskIndex),
genFinaReduceName(args.LastTaskIndex))
if err != nil {
log.Fatalf("os rename %v error:%v",
genTmpReduceName(args.WorkerPID, args.LastTaskIndex), err)
}
}
// 删除现在的task
delete(c.tasks,taskKey)
// 如果没任务了就需要更改状态了
if len(c.tasks) == 0 {
c.transition(c.NMap,c.NReduce)
}
}
c.mu.Unlock()
}
// 分配任务
...
}
至此,lab 1完成,debug真的痛苦
记录一下三天的debug之旅,第一天写了又写,多种方案都不行,后面上网查了一些思路,看到了呆呆大神的思路讲解,也才终于清晰了很多。
可能出现的错误
我在测试的时候出现了一个错误
reduce parallelism test fail --- too few parallel reduces.
--- reduce parallelism test: FAIL
Copy 2022/01/02 15:16:36 分配reduce任务0给worker 5338
2022/01/02 15:16:36 Received reduce task 0 from coordinator
2022/01/02 15:16:37 Mark reduce task 0 as finished on worker 5338
2022/01/02 15:16:37 分配reduce任务1给worker 5338
2022/01/02 15:16:37 Received reduce task 1 from coordinator
2022/01/02 15:16:38 Mark reduce task 1 as finished on worker 5338
2022/01/02 15:16:38 分配reduce任务2给worker 5338
2022/01/02 15:16:38 Received reduce task 2 from coordinator
2022/01/02 15:16:39 Mark reduce task 2 as finished on worker 5338
2022/01/02 15:16:39 分配reduce任务3给worker 5338
2022/01/02 15:16:39 Received reduce task 3 from coordinator
2022/01/02 15:16:41 Mark reduce task 3 as finished on worker 5338
2022/01/02 15:16:41 分配reduce任务4给worker 5338
2022/01/02 15:16:41 Received reduce task 4 from coordinator
2022/01/02 15:16:43 Mark reduce task 4 as finished on worker 5338
2022/01/02 15:16:43 分配reduce任务5给worker 5338
2022/01/02 15:16:43 Received reduce task 5 from coordinator
2022/01/02 15:16:44 Mark reduce task 5 as finished on worker 5338
2022/01/02 15:16:44 分配reduce任务6给worker 5338
2022/01/02 15:16:44 Received reduce task 6 from coordinator
2022/01/02 15:16:45 Mark reduce task 6 as finished on worker 5338
2022/01/02 15:16:45 分配reduce任务7给worker 5338
2022/01/02 15:16:45 Received reduce task 7 from coordinator
2022/01/02 15:16:45 Mark reduce task 7 as finished on worker 5338
2022/01/02 15:16:45 分配reduce任务8给worker 5338
2022/01/02 15:16:45 Received reduce task 8 from coordinator
2022/01/02 15:16:45 Mark reduce task 8 as finished on worker 5338
2022/01/02 15:16:45 分配reduce任务9给worker 5338
2022/01/02 15:16:45 Received reduce task 9 from coordinator
2022/01/02 15:16:46 Mark reduce task 9 as finished on worker 5338
2022/01/02 15:16:46 All REDUCE tasks finished. Prepare to exit
从脚本输出可以看到:
开了两个窗口来接任务,map的时候还有两个,结果到了reduce的时候却没了,只剩下一个了,这样很显然就是错的,查看代码发现,代码中写了一行:
在transition函数中:
Copy // 清空管道
c.cn = make(chan Task,int(math.Max(float64(files),float64(nReduce))))
当时是想着,如果不清空管道,要是里面还有任务堆积,那下面的 c.cn <- task 不就会阻塞了吗(因为存在定时清除且放回channel的操作)
Copy for i := 0;i < c.NReduce;i++ {
task := Task{
TaskType: ReduceTask,
// 0~9
TaskIndex: i,
}
c.tasks[genTaskKey(task.TaskIndex,task.TaskType)] = task
c.cn <- task
}
后面发现,只要管道不为0,len(c.tasks)是不可能为0的,也就是说,只有当管道和任务同时清空的时候才代表map or reduce任务结束,因为要是任务被放回channel,那只能是代表失败了,那肯定是没有执行过delete操作的
注:early exit test 会报错,换一台Linux机或把 test-mr.sh 中 wait -n 的 -n 去掉