Lab1具体实现
要求简介
实现 mrsequential.go 中的单词计数的分布式版本(其实只是单机跑)
要求跟论文2.1的单词统计挺像的,就是论文里是master分配任务,我们的程序要求worker去向coordinator要任务
整体框架图

再看一下生成上述文件名的四个函数:
开始
按lab1题目给我们的提示是:一个开始的方法是修改mr/worker.go的Worker(),向协调器发送一个要求任务的RPC。然后修改协调器,使其回应一个尚未启动的map任务的文件名。然后修改Worker以读取该文件并调用应用程序的Map函数,如mrsequential.go。
从提示和对整个mr包的阅读我们容易看出mr.Worker()是本次lab的着手点,因为tesh-mr.sh里面是启动了2-3个worker来测试,所有我们需要区分不同的终端产生的请求。并且由于worker需要不断向coordinator发送rpc请求任务,所有我们需要套一个死循环来请求rpc。
我们在请求rpc的时候需要拿取任务,也需要告知coordinator我们完成了哪些任务,方便coordinator的状态转换,这里有多种方法可以实现,像:
在这次请求时带上上一次请求执行的信息,告知coordinator我上次请求已经完成了
在完成时额外发送一个rpc,告知coordinator我这次请求已经完成了
因此可以大概写出worker函数的框架:
由上述的整体框架图,我们可知整个mr过程分为四个step
在step1开始前,coordinator需要有分配任务的阶段,这一阶段可在rpc请求中直接完成:
Step 1
step 1 是worker生成 map临时文件,步骤是:
worker向coordinator发送rpc请求,获取到任务,回复内容如下:
根据 reply.File 打开我们需要读取的文件
根据 ihash(val.Key) % reply.NReduce 写入临时文件
实现如下:
Step 2
step 2 是若worker带过来的rpc请求里有上一个TaskInfo,则说明上一个map任务已经完成,这时候就可以原子的重命名这些临时文件
coordinator的操作比较简单:
step 3
step 3 是在map任务已经全部完成的前提下,worker开始处理reduce任务,这里涉及到状态转换:
接着便是worker对reduce任务的处理:
step 4
step 4 是处理worker完成的reduce任务,比较简单,不多说:
至此,lab 1完成,debug真的痛苦
记录一下三天的debug之旅,第一天写了又写,多种方案都不行,后面上网查了一些思路,看到了呆呆大神的思路讲解,也才终于清晰了很多。

可能出现的错误
我在测试的时候出现了一个错误
reduce parallelism test fail --- too few parallel reduces.
--- reduce parallelism test: FAIL
从脚本输出可以看到:
开了两个窗口来接任务,map的时候还有两个,结果到了reduce的时候却没了,只剩下一个了,这样很显然就是错的,查看代码发现,代码中写了一行:
在transition函数中:
当时是想着,如果不清空管道,要是里面还有任务堆积,那下面的 c.cn <- task 不就会阻塞了吗(因为存在定时清除且放回channel的操作)
后面发现,只要管道不为0,len(c.tasks)是不可能为0的,也就是说,只有当管道和任务同时清空的时候才代表map or reduce任务结束,因为要是任务被放回channel,那只能是代表失败了,那肯定是没有执行过delete操作的
注:early exit test 会报错,换一台Linux机或把 test-mr.sh 中 wait -n 的 -n 去掉
Last updated