Lab1具体实现

要求简介

实现 mrsequential.go 中的单词计数的分布式版本(其实只是单机跑)

要求跟论文2.1的单词统计挺像的,就是论文里是master分配任务,我们的程序要求worker去向coordinator要任务

整体框架图

image-20220103013740271

再看一下生成上述文件名的四个函数:

开始

按lab1题目给我们的提示是:一个开始的方法是修改mr/worker.go的Worker(),向协调器发送一个要求任务的RPC。然后修改协调器,使其回应一个尚未启动的map任务的文件名。然后修改Worker以读取该文件并调用应用程序的Map函数,如mrsequential.go。

从提示和对整个mr包的阅读我们容易看出mr.Worker()是本次lab的着手点,因为tesh-mr.sh里面是启动了2-3个worker来测试,所有我们需要区分不同的终端产生的请求。并且由于worker需要不断向coordinator发送rpc请求任务,所有我们需要套一个死循环来请求rpc。

我们在请求rpc的时候需要拿取任务,也需要告知coordinator我们完成了哪些任务,方便coordinator的状态转换,这里有多种方法可以实现,像:

  • 在这次请求时带上上一次请求执行的信息,告知coordinator我上次请求已经完成了

  • 在完成时额外发送一个rpc,告知coordinator我这次请求已经完成了

因此可以大概写出worker函数的框架:

由上述的整体框架图,我们可知整个mr过程分为四个step

在step1开始前,coordinator需要有分配任务的阶段,这一阶段可在rpc请求中直接完成:

Step 1

step 1 是worker生成 map临时文件,步骤是:

  • worker向coordinator发送rpc请求,获取到任务,回复内容如下:

  • 根据 reply.File 打开我们需要读取的文件

  • 根据 ihash(val.Key) % reply.NReduce 写入临时文件

实现如下:

Step 2

step 2 是若worker带过来的rpc请求里有上一个TaskInfo,则说明上一个map任务已经完成,这时候就可以原子的重命名这些临时文件

coordinator的操作比较简单:

step 3

step 3 是在map任务已经全部完成的前提下,worker开始处理reduce任务,这里涉及到状态转换:

接着便是worker对reduce任务的处理:

step 4

step 4 是处理worker完成的reduce任务,比较简单,不多说:

至此,lab 1完成,debug真的痛苦

记录一下三天的debug之旅,第一天写了又写,多种方案都不行,后面上网查了一些思路,看到了呆呆大神的思路讲解,也才终于清晰了很多。

image-20220102155301075

可能出现的错误

我在测试的时候出现了一个错误

reduce parallelism test fail --- too few parallel reduces.

--- reduce parallelism test: FAIL

从脚本输出可以看到:

开了两个窗口来接任务,map的时候还有两个,结果到了reduce的时候却没了,只剩下一个了,这样很显然就是错的,查看代码发现,代码中写了一行:

​ 在transition函数中:

当时是想着,如果不清空管道,要是里面还有任务堆积,那下面的 c.cn <- task 不就会阻塞了吗(因为存在定时清除且放回channel的操作)

后面发现,只要管道不为0,len(c.tasks)是不可能为0的,也就是说,只有当管道和任务同时清空的时候才代表map or reduce任务结束,因为要是任务被放回channel,那只能是代表失败了,那肯定是没有执行过delete操作的

注:early exit test 会报错,换一台Linux机或把 test-mr.sh 中 wait -n 的 -n 去掉

Last updated