Spark里Histroy Server丢task,job和Stage问题调研

  |     |   本文总阅读量:

版权声明:本文原创,转载请留意文尾,如有侵权请留言,谢谢

引言

   最近发现一个问题,Spark UI查看正在运行的Application的时候会丢失一些metrics的信息,而从history sever打开一些Application的时候,则会丢失一些task,job,甚至stage的记录,本篇文章深入源码来查找一下原因,本文Spark源码的版本为2.3.0。

原因探究

  因为丢stage比较好定位,所以我们探索源码的过程主要聚焦在stage方面,以此再来拓展到丢job和task。

eventlog

  通过查看eventlog,发现丢失Stage的原因是,是因为确实丢失了部分stage的事件,丢失了SparkListenerStageSubmitted事件或SparkListenerStageCompleted事件,它们确实没有以json的形式写入eventlog,手动构造缺失的事件后,history server可以正常显示缺失的stage。

driver log

  那么下一步就是要找出为什么会丢失这些事件,我们通过查看driver的log,发现以下报错:

1
2
3
4
18/10/18 15:38:51 ERROR AsyncEventQueue: Dropping event from queue eventLog. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.


18/10/18 18:09:58 ERROR AsyncEventQueue: Dropping event from queue appStatus. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.

  通过报错我们发下,事实上不仅丢失了部分eventlog里的事件,而且有的appStatus里的事件也丢失了,appStatus主要是在Application正在running的时候监听,为Spark UI提供支持,这或许就能解释为啥有些metrics被丢掉的原因了,我们下一步深入源码看看。

EventLoggingListener.scala

  EventLoggingListener是用来为event落盘到eventlog的,我们查看与Stage相关的时间,为它们打上日志,查看是否收到相关的listener事件:

1
2
3
4
5
6
7
8
9
10
11
// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
logError("$$$ In onStageSubmitted StageSubmitted Stage ID: " + event.stageInfo.stageId)
logEvent(event)
}

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
logError("$$$ In onStageCompleted StageCompleted Stage ID: " + event.stageInfo.stageId)
logEvent(event, flushLogger = true)
}

  查看日志:

  我们发现确实是丢失了一些event的listener导致event没有落盘。

AsyncEventQueue.scala

  我们从日志中看出是与AsyncEventQueue有关,有一些event因为太慢被丢失了,我们查看AsyncEventQueue.scala相关源代码,主要是由一个eventQueue来保存event,入队的代码主要在post函数内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}

eventCount.incrementAndGet()
// 能否加进队列
if (eventQueue.offer(event)) {
return
}

eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")

val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logTrace(s"Dropped $droppedCount events from $name since $previous.")
}
}
}
}

  看懂了代码,就比较容易理解为啥那些事件被丢弃了,因为可以看到如果在post函数里正常可以加进eventQueue里是直接return的,不能加进去才会执行下面的逻辑,所以应该是eventQueue列溢出了,新的事件要想加进去是加不进去的。

验证

  我们来做一个验证,打印出丢失的event,比较一下是不是和我们在EventloggingListener里打日志丢失的事件一致。
  我们改变一下日志的级别,打印出丢失的event,然后查看driver的log:

  我们可以看出丢失了很多事件,大部分都是SparkListenerExecutorMetricsUpdate的事件,这也解释了之前为啥正在运行的Application里Spark UI会丢掉一些metrics的原因了。由于输出太多,我们做一个限制,只打印丢失stage的事件的日志:

1
2
3
4
5
6
7
8
9
10
if (event.isInstanceOf[SparkListenerStageCompleted]
|| event.isInstanceOf[SparkListenerStageSubmitted]) {
val id = if (event.isInstanceOf[SparkListenerStageSubmitted]) {
event.asInstanceOf[SparkListenerStageSubmitted].stageInfo.stageId
} else {
event.asInstanceOf[SparkListenerStageCompleted].stageInfo.stageId
}

logError(s"Dropping event $event StageId: $id")
}

  查看日志,我们看到丢掉的相一致:

解决方法

  知道为啥会丢event,就容易解决了,我们只需要让那个队列变大一点就可以了,我们看eventQueue初始化的地方:

1
2
3
4
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

  通过查阅文档,LISTENER_BUS_EVENT_QUEUE_CAPACIT是由spark.scheduler.listenerbus.eventqueue.size=<eventqueue_size>指定的,默认为10000,我们将它设置为100000,history sever就可以完整的显示了,所有stage都没有丢失,并且所有的event都没有丢失。

Conclusion

  最后,我们解决丢event的方法实际上是用Spark提供的参数,静态的让队列在初始化的时候容量变大了,这需要driver的内存大一点,还有一种方法是动态的扩容,当队列溢出的时候自己扩容,这种方法需要自己实现,这里就没有实现啦。


坚持原创技术分享,您的支持将鼓励我继续创作,π(3.14)元就够啦!



文章目录
  1. 1. 引言
  2. 2. 原因探究
    1. 2.1. eventlog
    2. 2.2. driver log
  3. 3. EventLoggingListener.scala
  4. 4. AsyncEventQueue.scala
  5. 5. 验证
  6. 6. 解决方法
  7. 7. Conclusion
您是第 位小伙伴 | 本站总访问量 | 已经写了 609.3k 字啦

载入天数...载入时分秒...