Introduction
In lab 1 we will build a MapReduce system. Two main parts are implemented, one is a worker process that calls Map and Reduce functions and handles reading and writing files, and the other is a coordinator process that hands out tasks to workers and copes with failed workers.
Before you start the lab, must be familiar with Golang concurrency programming and RPC. Understanding the MapReduce paper is also essential.
Experiment Description
To get started the lab 1, you should read the lab document.
Because the RPC using based on Unix Socket, you must have a Linux System to Finish the experiment. (I choose the Ubuntu 22.04.2)
A simple sequential MapReduce version in src/main/mrsequential.go. The code is helpful to understand to the whole system works.
Access to the system is main/mrcoordinator.go. When starting the program, it will call MakeCoordinator() to create a master. Secondly, the server() function will start Unix Socket Listen. Then, the main process will call Done() to check the total MapReduce whether finished. All of the above functions must be implemented in mr/coordinator.go.
As for the worker, we should put our implementation in mr/worker.go, and also refer the mrsequential.go.
Using the script test-mr.sh to check the program pass. Sometimes ctrl + c might not exis program, you should use ps to find the pid of the mrcoordinator process and kill it.
Implementation
Execution Overview
The master first assigns all map tasks to workers. In the map state, the worker will generate intermediate files "mr-X-Y", where X is the Map task number, and Y is the Reduce task number. Then in the reduce state, the worker will input intermediate files, after being processed by reduce function, it will put the results into "mr-out-Y".
Master Implementation
No lock is used, just ensured the data operation is thread-safe.
First, the master will allocate tasks to workers, so we should definite task data structure. Filename handled by map function, Id is the worker num and it could be a map worker or a reduce worker. So we can merge the two tasks.
| |
Then, the master should record the state of the system. There is map, reduce, finish three states, just used int32 (0,1,2)to refer to them. Because we can’t use lock, make sure the work gets race task is used channel for synchronization. The MapTaskNum and ReduceTaskNum also needs to record the number of tasks assigned. Our system must tolerate machine failures gracefully. We should record the working time(Unix TimeStamp), a working time of more than 10 seconds is considered to be the failure of the machine. Each worker’s time has to be recorded and modified, and there are data races. To get good performance, I used sync.Map.
| |
As the system begins, the master assigns map tasks to MapTask chan and record the working start time in TaskTime.
| |
When a work sends a request to the master, it should get worker task, worker num, system state and a succeed mark to confirm that the task was acquired.
| |
The master will send a reply to workers in different states.
| |
If a worker finish work, the master also will handle it in different states.
| |
At last, to tolerate machine failures we should assign its worker to another worker machine. In this system, I create a goroutine to check task finish periodically(2 seconds).
| |
Worker Implementation
The worker loop fetches tasks from the master for execution
| |
The map and reduce work can refer to some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.
To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.