Cluster computing systems, such as MapReduce, Hadoop and Dryad etc, have become
a popular framework for data-intensive applications. Among various kinds of
sizes in the cluster, throughput and job completion time are the main factors
affecting the data-centers costs
and users satisfaction. They
were thought as the most important measures of the computation efficiency.
In recent years, many studies have been done on data locality. A distributed
adaptive data replication algorithm DARE (Abad et al.,
2011) that aids the scheduler to achieve better data locality was proposed.
DARE improves data locality by more than 7 times with the FIFO scheduler in
Hadoop and achieves more than 85% data locality for the FAIR scheduler with
In order to improve the data locality of map tasks, a Next-k-node Scheduling
(NKS) (Cheng et al., 2012) was proposed, implementing
the NKS method in hadoop-0.20.2. Experiment results have shown that the NKS
method reduced 78% of the map tasks processed without node locality; reduced
77% of the network load caused by the tasks and improved the performance of
Hadoop MapReduce when comparing with the default task scheduling method in Hadoop.
When reassigning the slots, FAIR picks the most recently launched tasks to
kill without considering the job character and data locality which increases
the network traffic while rescheduling the killed Map/Reduce tasks. Focused
on this problem an improved FAIR scheduling algorithm (Tao
et al., 2011) was proposed, which take into account the job character
and data locality while killing tasks to make slots for new users. In order
to improve the scheduling efficiency, the dynamic processing slots scheduling
(Kurazumi et al., 2012) was used for the I/O
intensive jobs of Hadoop MapReduce focusing on I/O wait during execution of
An elastic replication management system for HDFS, which provides an active/standby
storage model for HDFS (Cheng et al., 2012),
was proposed. It utilizes a complex event processing engine to distinguish real-time
data types and then dynamically increases extra replicas for hot data, cleans
up these extra replicas when the data cool down and uses erasure codes for cold
A method of energy-conserving, hybrid, logical multi-zoned (hot zone and cold
zone) variant of HDFS (Kaushik and Bhandarkar, 2010)
was proposed to manage data-processing intensive, commodity Hadoop cluster.
Analysis of the traces of a Yahoo! Hadoop cluster showed significant heterogeneity
in the data's access patterns which can be used to guide energy-aware data placement
policies. The trace-driven simulation results with three-month-long real-life
HDFS traces from a Hadoop cluster at Yahoo! show a 26% energy consumption reduction
by doing only Cold zone power management.
It is a common practice for data-intensive systems to place the data as close
as possible when calculating, which referred to as the data locality problem.
Due to the significant impaction on system throughput and job completion time,
the data locality problem has become an important problem in data-intensive
systems. HDFS adopts a uniform triplication policy (i.e., three replicas for
each file) to improve the data locality. The policy can also ensure data availability
and fault tolerance in the event of data and disk failures. This policy could
also achieve load balancing by distributing work across the replicas. Though
high reliability and performance the policy has in most cases, there are two
First, in a large and busy HDFS cluster, the hot data (Chen
et al., 2010) could be requested by many distributed clients concurrently.
Replicating the hot datas only on three different nodes is not enough to avoid
contention of the data nodes which store hotdatas. If the number of jobs which
concurrently access to hot data exceeds that of replicas, some of these jobs
may have to access to the data remotely and compete for the same replica. Second,
the triplication policy comes with a high over head cost in the management for
the cold data. Too many replicas may not improve availability significantly,
but bring unnecessary expenditure instead. The management cost, including storage
and network band width, will significantly increase with the increasing number
Therefore, the hot data should be assigned with a larger number of replicas
to improve data locality of concurrent accesses. The additional copies may be
used not only to not only improve availability, but also to provide load balancing
and improve overall performance if replicas and data accessing requests are
reasonably distributed. As a result, dynamic configuration of file replica is
necessary (Berral et al., 2011).
Here, we will give a generalized introduction of a method of job type classification
and after that a dynamic replica policy was put forward.
Job type classification: Since the different types of job have different
amount of requests on HDFS, the heterogeneity of a job type should be taken
into consideration. In this part, we classify jobs into cluster according to
the following method.
The execution time of a map task is the total time spent on fetching the output
of the map tasks and writing results to HDFS. In the following we decouple the
time consumption on I/O operations and CPU operations of a MapReduce application.
The execution time for a map/reduce task is then defined as:
TaskExecutionTime = OT + CT + IOT
where, OT is the fixed overhead in running a task, CT and IOT are times taken
in CPU and IO operations, respectively. OT is independent of data size which
mainly includes JVM (Java virtual machine) initialization overhead and scheduling
time. CPU-related operations mostly occur in the user-defined map and reduce
function. Broadly, IO operations can be classified as follows:
||Input and output for a map/reduce task
||Reading and writing for sorting data in a map/reduce task
||Shuffle for a reduce task
CT and IOT are two parts, distinguishing from other types of task, to represent
the characteristic of a task. The ratio between them is denoted by Computing
Rate (CR) and I/O rate, respectively. The I/O rate of a task is the total amount
of input and output of a task divided by task execution time. Since, the Hadoop
framework uses cache mechanism and temporary files for sorting, the accurate
total amount of input and output of a task is difficult to be counted. Thus,
in this study, we adopt CR to represent the characteristic of a task which is
CR = CT/Task ExecutionTime = CT / (OT
+ CT + IOT)
If a tasks CR reaches to 1,
the task is regarded CPU intensive, or I/O intensive if CR is close to 0.
The LPL was validated by running 19 bench marks, including machine learning
jobs (Lu et al., 2012), web search jobs and some
typical MapReduce benchmark jobs etc. According to their results, an I/O intensive
job can be defined as both of its map CR and reduce CR is less than 0.2. Now,
reduce tasks in 16 jobs of 17 benchmarks (94%) are able to be reflected correctly.
Dynamic replica policy: In this part, we present a dynamic replica management
based on HDFS, including a dynamic adjustment and selection mechanism of replica.
After that we implement a experiment on the method replica controller based
on HDFS. The overall architecture of the system is shown in Fig.
|| System architecture
The following presents the main module and function of the replica controller:
||History Logs: Once name node receives file read requests
from clients, the request logs will be first stored in buffer first, then
be sent to the history Logs of replica system in every t sec
||Timer: The main function is to prevent the situation that a file
read request from the client surge in a short time and then a sharp decline,
leading to corresponding changes of the number of copies in a short period
of time. These changes may increase the burden on system. Its cycle equals
||Controller: It has two main functions. The first is to modify the
BlockMap in Namenode to adjust numbers of replica. The second is to select
the suitable nodes by analysis of history logs information, then add or
delete replica to this selected nodes
Dynamic adjustment mechanism of replica: In order to eliminate the performance
bottleneck of HDFS and achieve dynamic adjustment of replica. We made some small
changes on the basis of the original HDFS. Let variable which represents the
number of replica in fsimage as the min replica number (minReplica) and add
two variables numReplica and connectCounter to the attribution of HDFS. NumReplica
represents current numbers of replica, its initial value equals minReplica.
Replica system will adjust the number of replica dynamically according to value
of numReplica. ConnectCounter represents the number of users that read the same
file in the same time.
Pseudocode of file read and dynamic adjustment process is shown as Table
In this pseudocode, ρmax = 0.8xLmax xnum Replica,
ρmin = sto Lmax is the max link number
of each replica and st represents the standard replica number in fsimage.
Dynamic selection mechanism of replica: Here, a copy selection method
based on the stage of history information is proposed. This method will describe
that which data node should be assigned.
In general, an arbitrary undirected or bidirectional graph G(V, E) is taken
into consideration. V is node set V1∩V2 ... Vh
... ∩Vk = Φ, Vi is sub set of V and represents
data node in different region. E is link set between two nodes and E⊂VxV.
The link between any two nodes ni and nj is given a weight
value of d(i, j), it is the delay between them. In this mechanism, assign value
X to each node Xi, represent whether the replica of file X is stored
on node Hi. If Xi = 1, indicates that there are copies
of the file on the node ni. V'h represents node set which
has copies of file in region Vh.
Select node which needs to be added to file replica as follows:
First, in order to determine adding file replica in which domain, replica controller
will divide records into R = r1 ∪ r2 ∪ ... ∪
rh ... rk according to history information, rk
represents number of requests sent by client in region Vk.
Second, replica controller will select region
of node that need to add replica:
After determine the region, replica controller need to determine which node
is in this selected region.
|| Pseudocode of file read and dynamic adjustment process
Since replica controller has history information, it can record the location
of users who send request as Vr, Vr = Vr1∪Vr2∪
... Vrh ... ∪Vrk, Vrh is node set of all
the requests in Vh. Grh(Vrh, E) is graph constituted
by Vrh and link between nodes. And distribution of all copies in
HDFS file system can be informed by reading the name of the node in the BlockMap.
By doing this, we can get VR, VR = VR1 ∪VR2
∪VR3 ... ∪VRh ... VRk, VRh,
VR, is set of nodes which own copies of file. GRh(VRh,
E) is the coverage of VRh. In summary:
The selection process of deleting file replica is similar to that of adding
nodes. What need to do is change the value of
We evaluated the proposed method of job classification and dynamic replica
management in a private cluster with 1namenode and 20 data nodes. The name nodes
are run on the server with Ubuntu 10.0.4, two Intel Xeon E5520 CPUs, 2.26 GHz,
12 GB memory and 2TB SATA disk. The data nodes are run on a personal computer
with Ubuntu 10.0.4, Intel Xeon E5420 CPU, 2.50 GHz, 8 GB memory and 300 GB SATA
disk. The Java version is 1.6.0_24. These nodes locate in three different racks
with Gigabit Ethernet network connecting.
Experiments of job type classification: The aim of the first experiment
is to evaluate our analysis on generalization of workload characteristics. The
input data is 512 MB for each job. Since the default size of block is 64 MB,
the size of input data leads eight map tasks for each job and we manually set
eight reduce tasks.
We have verified the similarity in the characteristic of tasks in an individual
benchmark. For simplicity, 5 representative applications among 19 benchmarks
is presented. Those 5 benchmarks are, respectively Word Count, Grep, Sort, Terasort
and Crypto. The selection is based on their relativeness with CR. Namely while
the latter three can represent I/O intensive, the first two are relatively more
CPU intensive. We separately ran each of these 5 benchmarks in isolation to
identify the computing rate and the execution time of each (map/reduce) task.
The result shows that there is no significant variation for tasks belonging
to the same job in terms of both CR and execution time. The variation of reduce
tasks tends to be greater than that of map tasks, which is because that the
input data of reduce tasks and outputted by map tasks are probably not partitioned
evenly. However, the variation of execution time for map tasks and reduce tasks
in the same job still remains in a very similar level. Therefore, we can use
the characteristic of a task to represent the characteristic of the rest of
According to the research of Hadoop benchmark provided by Intel, some characteristics
of benchmark program workload are shown in Table 2.
From the comparison of Fig. 2 and Table 1,
the proposed method according to computing rate can be used for job classification.
The results are basically in line with Intels
Experiments of dynamic replica management: TeraSort is a standard map/reduce
sort, except for a custom partitioner that ensures that all of the keys in reduce
N are after all of the keys in reduce.
|| Characteristics of benchmark program workload
|| Characteristic analysis for 6 typical benchmarks
This is a requirement of the contest so that the output of the sort is totally
ordered; even if it is divided up by reduce. Experimental data is generated
by TeraGen. It generates input data for the sort that is byte for byte equivalent
to the C version that was released in March of 2009, including specific keys
and values. It divides the desired number of rows by the desired number of tasks
and assigns ranges of rows to each map. The map jumps the random number generator
to the correct value for the first row and generates the following rows.
TeraSort uses Hadoop's MapReduce mechanism to achieve Sort. With the perfect
combination of Hadoop mechanism, TeraSort has been a superior sort program.
So it can be used for testing Hadoop in cluster due to its high test value.
From experiment 1, TeraSort can be classified as I/O intensive job in reduce
phase. The amount of experimental data is 10 GB. The amount of map task can
be adjusted by mapred.min.split.size
and the amount of reduce task can be adjusted by mapred.reduce.tasks.
To simulate reality in different time periods of different users simultaneously
access to the same data, the number of map task and reduce task can be adjusted.
First, we maintain the amount of map task equals 16 and change the amount of
reduce task. Moreover, make a comparison of execution time between HDFS default
copy strategy and the dynamic replica adjustment method. The execution time
of map and reduce under HDFS default copy strategy and under the dynamic replica
adjustment method are shown in Fig. 3 and 4,
From the analysis of Fig. 3 and 4, we can
draw such conclude that: when the number of reduce task is less than 15, the
total time and reduce time are inversely proportional to the amount of reduce
task. When the number of reduce task is greater than 15, total time and reduce
time remain substantially constant. The amount of reduce task should be close
to that of slave nodes and slightly larger than that of nodes. There is no apparent
relationship between execution time of Map and the amounts of reduce task.
Compared to HDFS original copy strategy, dynamic replica policy has a significant
performance improvement, in particular when reduce task number is greater than
15. But there is no significant decline of execution time when rapid growth
of the number of users occurs. The most suitable number of reduce task should
be 0.95 or 1.75x(number of datanodexmapred.tasktracker. tasks.maximum).
If the number of tasks is 0.95 times as many as that of nodes all reduce tasks
can be launched at the same time after the end of the map task output transmission.
||Execution time of map and reduce under HDFS default copy strategy
||Execution time of map and reduce under the dynamic replica
If the number of tasks is 1.75 times as many as that of nodes, then fast node
will launch the second batch reduce tasks after finishing of the first batch
reduce tasks, it is more favorable load balancing. As can be seen from Fig.
3 and 4, the proposed method works well when there are
15 (nearly 0.95 times), 25 and 30 (nearly 1.75 times), programs execution
time has an apparently decline. In other words, the performance of the Hadoop
can be significantly improved with reasonable adjustment of the number of Job
Moreover, a dynamic parameter adjustment method was achieved by script. In
the initial stage, set 5 reduce tasks, set 20 reduce tasks from time t1, set
40 reduce tasks from time t2, set 60 reduce tasks from time t3, generate a random
number between 10-60 from time t4. The contrast of the proposed dynamic replica
adjustment method and HDFS default replica policy is shown in Fig.
4, last row.
CONCLUSION AND FUTURE WORK
The replica technique is widely used in the storage system, which can ensure
system stability, enhance the rate of user access to files, as well as improve
the security of data and other characteristics. HDFS, which uses the form of
a copy of the file block to store larger files, is a common distributed file
system. It has a default replica mechanism, yet is static. When many users request
for the same particular file at the same time, this file will be the so-called
hot file, as a result, system performance will have a sudden drop. Users
QoS will have a corresponding decline. Therefore, it is necessary to use a dynamic
replica scheduling mechanism to guarantee system performance and Users
QoS. In this paper, a job type classification method and a dynamic replica manage
mechanism based on HDFS are proposed. I/O intensive jobs will be picked up first
and then uses the dynamic replica manage mechanism to determine whether to increase
or decrease the number of copies on appropriate datanode. Experiment result
shows that the proposed method can improve HDFS performance effectively.
There are still some limitations in this proposed method. Its basic idea is
sacrifice storage space to reduce users access delay time. However, this
mechanism will not be activated until large amounts of file requests occur.
And the number of file replica will have a corresponding decline when the number
of user request decreases. Furthermore, the prices of the storage resources
are relative lower in the real cloud environment. Therefore, the proposed method
is acceptable in real cloud environment.
This study was supported by the National Natural Science Foundation of China
(Grant No. 61202163, 61240035); Natural Science Foundation of Shanxi Province
(Grant No. 2012011015-1). This work was also supported in part by the US National
Science Foundation (NSF) under Grant No. CNS-1016320 and CCF-0829993. This work
is also supported by Scientific Research Program Funded by Shaanxi Provincial
Education Department (Program No.2013JK1139).