各task自行同步线程启动数量,当所有线程都启动之后,输出收集器开始运作:
long now = System.currentTimeMillis(); if (now > this.start + this.interval) { this.start += this.interval; context.collect(new LongWritable(this.start), new LongWritable( this.writeBlocks)); } 。。。。。。 |
可以看到key是时间戳,value是我们想收集的数值,收集器收集到的数据将进一步提供给Reducer来分析,这里有一个压力测试的关键点,即最大并发开始时间点和结束时间点的判断。观察Reducer类的reduce方法:
public void reduce(Text key, Iterator<LongWritable> value, OutputCollector<Text, Text> context, Reporter reporter) |
由于所有map都以相同的时间戳作为key,因此同一时刻迭代器value的size代表了有多少个map已经达到了最大并发度,我们判断这个size,当其与我们预期的map总数一致时,则可以将该时间戳作为最大并发压力的开始时间点,当size开始小于预期map总数时,则代表最大并发压力的结束时间点,测试结果分析时可以掐取这一段数据作为测试结果,免去开始准备阶段和快结束阶段压力变小对测试结果的干扰。
更进一步我们可以在hdfs上设计一个标志位,当一个maptask执行完毕之后,通过该标志位通知到其他所有map task,以便快速结束当前的测试。
原文转自:http://www.taobaotest.com/blogs/2515