GrpcEventSupport.scala
package wechaty.hostie.support
import com.typesafe.scalalogging.LazyLogging
import io.github.wechaty.grpc.puppet.Event.{EventResponse, EventType}
import io.grpc.stub.StreamObserver
import wechaty.puppet.events.EventEmitter
import wechaty.puppet.schemas.Event._
import wechaty.puppet.schemas.Puppet
import wechaty.puppet.schemas.Puppet.PuppetEventName
/**
*
* @author <a href="mailto:jcai@ganshane.com">Jun Tsai</a>
* @since 2020-06-02
*/
trait GrpcEventSupport extends StreamObserver[EventResponse] {
self: LazyLogging with EventEmitter with GrpcSupport with ContactRawSupport with MessageRawSupport =>
private[wechaty] var idOpt: Option[String] = None
override def onNext(v: EventResponse): Unit = {
try {
if (v.getType != EventType.EVENT_TYPE_HEARTBEAT) {
val hearbeat = new EventHeartbeatPayload
hearbeat.data = "onGrpcStreamEvent(%s)".format(v.getType)
emit(PuppetEventName.HEARTBEAT, hearbeat)
}
v.getType match {
case EventType.EVENT_TYPE_UNSPECIFIED =>
logger.error("PuppetHostie onGrpcStreamEvent() got an EventType.EVENT_TYPE_UNSPECIFIED ")
case other =>
val payload = processEvent(other, v.getPayload)
val eventName = Puppet.pbEventType2PuppetEventName.getOrElse(other, throw new IllegalAccessException("unsupport event " + other))
emit(eventName, payload)
}
} catch {
case e: Throwable =>
logger.error("Grpc onNext", e)
}
}
private def processEvent(eventType: EventType, data: String): EventPayload = {
logger.debug("receive event:{},data:{}", eventType, data)
eventType match {
case EventType.EVENT_TYPE_SCAN =>
Puppet.objectMapper.readValue(data, classOf[EventScanPayload])
case EventType.EVENT_TYPE_DONG =>
Puppet.objectMapper.readValue(data, classOf[EventDongPayload])
case EventType.EVENT_TYPE_ERROR =>
Puppet.objectMapper.readValue(data, classOf[EventErrorPayload])
case EventType.EVENT_TYPE_HEARTBEAT =>
Puppet.objectMapper.readValue(data, classOf[EventHeartbeatPayload])
case EventType.EVENT_TYPE_FRIENDSHIP =>
Puppet.objectMapper.readValue(data, classOf[EventFriendshipPayload])
case EventType.EVENT_TYPE_LOGIN =>
val value = Puppet.objectMapper.readValue(data, classOf[EventLoginPayload])
idOpt = Some(value.contactId)
value
case EventType.EVENT_TYPE_LOGOUT =>
idOpt = None
Puppet.objectMapper.readValue(data, classOf[EventLogoutPayload])
case EventType.EVENT_TYPE_MESSAGE =>
Puppet.objectMapper.readValue(data, classOf[EventMessagePayload])
case EventType.EVENT_TYPE_READY =>
Puppet.objectMapper.readValue(data, classOf[EventReadyPayload])
case EventType.EVENT_TYPE_ROOM_INVITE =>
Puppet.objectMapper.readValue(data, classOf[EventRoomInvitePayload])
case EventType.EVENT_TYPE_ROOM_JOIN =>
Puppet.objectMapper.readValue(data, classOf[EventRoomJoinPayload])
case EventType.EVENT_TYPE_ROOM_LEAVE =>
Puppet.objectMapper.readValue(data, classOf[EventRoomLeavePayload])
case EventType.EVENT_TYPE_ROOM_TOPIC =>
Puppet.objectMapper.readValue(data, classOf[EventRoomTopicPayload])
case EventType.EVENT_TYPE_RESET =>
logger.warn("PuppetHostie onGrpcStreamEvent() got an EventType.EVENT_TYPE_RESET ?")
Puppet.objectMapper.readValue(data, classOf[EventResetPayload])
case other =>
throw new IllegalAccessException("event not supported ,event:" + other)
}
}
override def onError(throwable: Throwable): Unit = {
logger.error("Grpc onError", throwable)
logger.info("reconnect.....")
new Thread(() => {
reconnectStream()
}).run()
}
override def onCompleted(): Unit = {
logger.info("completed")
}
}