Spark Streaming de um ator

9

Gostaria que um ator consumidor se inscrevesse em um tópico do Kafka e transmitisse dados para processamento adicional com o Spark Streaming fora do consumidor. Por que um ator? Porque li que sua estratégia de supervisor seria uma ótima maneira de lidar com falhas do Kafka (por exemplo, reiniciar em caso de falha).

Eu encontrei duas opções:

  • O Java KafkaConsumer class: seu método poll() retorna um Map[String, Object] . Eu gostaria que um DStream fosse retornado como KafkaUtils.createDirectStream , e eu não sei como buscar o fluxo de fora do ator.
  • Estenda o atributo ActorHelper e use actorStream() , como mostrado neste exemplo . Esta última opção não exibe uma conexão com um tópico, mas com um soquete.

Alguém poderia me apontar na direção certa?

    
por wipman 21.02.2017 в 09:44
fonte

1 resposta

2

Para lidar com falhas do Kafka, usei a estrutura do Apache Curator e a seguinte solução alternativa:

val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient

/**
  * This method returns false if kafka or zookeeper is down.
  */ 
def isKafkaAvailable:Boolean = 
   Try {
      if (zk.isConnected) {
        val xs = client.getChildren.forPath("/brokers/ids")
        xs.size() > 0
      }
      else false
    }.getOrElse(false)

Para consumir tópicos do Kafka, usei a biblioteca com.softwaremill.reactivekafka . Por exemplo:

class KafkaConsumerActor extends Actor {
   val kafka = new ReactiveKafka()
   val config: ConsumerProperties[Array[Byte], Any] = ... // see docs

   override def preStart(): Unit = {
      super.preStart()

      val publisher = kafka.consume(config)
      Source.fromPublisher(publisher)
            .map(handleKafkaRecord)
            .to(Sink.ignore).run()
   }

   /**
     * This method will be invoked when any kafka records will happen.
     */
   def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
      // handle record
   }
}
    
por John Mullins 01.03.2017 / 21:07
fonte