Introduction
In lab 1 we will build a MapReduce system. Two main parts are implemented, one is a worker process that calls Map
and Reduce
functions and handles reading and writing files, and the other is a coordinator process that hands out tasks to workers and copes with failed workers.
Before you start the lab, must be familiar with Golang concurrency programming
and RPC
. Understanding the MapReduce paper is also essential.
Experiment Description
To get started the lab 1, you should read the lab document.
Because the RPC using based on Unix Socket
, you must have a Linux System to Finish the experiment. (I choose the Ubuntu 22.04.2
)
A simple sequential MapReduce version in src/main/mrsequential.go
. The code is helpful to understand to the whole system works.
Access to the system is main/mrcoordinator.go
. When starting the program, it will call MakeCoordinator()
to create a master. Secondly, the server()
function will start Unix Socket Listen
. Then, the main process will call Done()
to check the total MapReduce whether finished. All of the above functions must be implemented in mr/coordinator.go
.
As for the worker, we should put our implementation in mr/worker.go
, and also refer the mrsequential.go
.
Using the script test-mr.sh
to check the program pass. Sometimes ctrl + c
might not exis program, you should use ps
to find the pid
of the mrcoordinator process and kill
it.
Implementation
Execution Overview
The master first assigns all map tasks to workers. In the map
state, the worker will generate intermediate files "mr-X-Y"
, where X is the Map task number, and Y is the Reduce task number. Then in the reduce
state, the worker will input intermediate files, after being processed by reduce function, it will put the results into "mr-out-Y"
.
Master Implementation
No lock is used, just ensured the data operation is thread-safe.
First, the master will allocate tasks to workers, so we should definite task data structure. Filename handled by map function, Id is the worker num and it could be a map worker or a reduce worker. So we can merge the two tasks.
|
|
Then, the master should record the state of the system. There is map, reduce, finish
three states, just used int32 (0,1,2)
to refer to them. Because we can’t use lock, make sure the work gets race task is used channel
for synchronization. The MapTaskNum
and ReduceTaskNum
also needs to record the number of tasks assigned. Our system must tolerate machine failures gracefully. We should record the working time(Unix TimeStamp
), a working time of more than 10 seconds
is considered to be the failure of the machine. Each worker’s time has to be recorded and modified, and there are data races. To get good performance, I used sync.Map
.
|
|
As the system begins, the master assigns map tasks to MapTask chan
and record the working start time in TaskTime
.
|
|
When a work sends a request
to the master, it should get worker task, worker num, system state and a succeed mark
to confirm that the task was acquired.
|
|
The master will send a reply
to workers in different states.
|
|
If a worker finish work, the master also will handle it in different states.
|
|
At last, to tolerate machine failures we should assign its worker to another worker machine. In this system, I create a goroutine
to check task finish periodically(2 seconds)
.
|
|
Worker Implementation
The worker loop fetches tasks from the master for execution
|
|
The map and reduce work can refer to some code from mrsequential.go
for reading Map input files, for sorting intermedate key/value pairs
between the Map and Reduce, and for storing Reduce output in files.
To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile
to create a temporary file and os.Rename
to atomically rename it.