DisrutporSupport.scala

package wechaty.padplus.support

import java.util.concurrent.{Executors, TimeUnit}

import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BusySpinWaitStrategy, EventFactory, EventHandler, ExceptionHandler}
import com.typesafe.scalalogging.LazyLogging
import wechaty.puppet.events.EventEmitter
import wechaty.puppet.schemas.Puppet.PuppetEventName
import wechaty.puppet.schemas.Puppet.PuppetEventName.Type

/**
  *
  * @author <a href="mailto:jcai@ganshane.com">Jun Tsai</a>
  * @since 2020-06-30
  */
trait DisrutporSupport extends EventEmitter {
  self :LazyLogging =>
  private class EventDef {
    var eventName: PuppetEventName.Type = _
    var value    : Any = _
  }
  private var disruptor:Disruptor[EventDef] = _
  protected def startDisruptor(): Unit ={
    disruptor = new Disruptor[EventDef](
      new EventFactory[EventDef]() {
        @Override
        def newInstance():EventDef= {
          new EventDef()
        }
      },
      1 << 32,
      Executors.defaultThreadFactory(),
      ProducerType.SINGLE,
      new BusySpinWaitStrategy()
    )
    disruptor.handleEventsWith(new EventHandler[EventDef](){
      override def onEvent(event: EventDef, sequence: Long, endOfBatch: Boolean): Unit = {
        logger.debug("emit event using super {}",event.eventName)
        DisrutporSupport.super.emit(event.eventName,event.value)
      }
    })
    disruptor.setDefaultExceptionHandler(new ExceptionHandler[EventDef] {
      override def handleEventException(ex: Throwable, sequence: Long, event: EventDef): Unit = {
        logger.error(ex.getMessage,ex)
      }

      override def handleOnStartException(ex: Throwable): Unit = {
        throw ex
      }

      override def handleOnShutdownException(ex: Throwable): Unit = {
        logger.error(ex.getMessage,ex)
      }
    })
    disruptor.start()
  }
  override def emit[T](eventName: Type, value: T): Unit = {
    logger.debug("publish event {}",eventName)
    disruptor.publishEvent((event: EventDef, sequence: Long) => {
      event.eventName = eventName
      event.value = value
    })
  }
  protected def shutdownDisruptor(): Unit ={
    disruptor.shutdown(5,TimeUnit.SECONDS);
  }
}