Como o Spark envia fechamentos para trabalhadores?

9

Quando escrevo uma transformação RDD, por ex.

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)

Eu entendo que o fechamento ( x => x * 3 ) que é simplesmente uma Function1 precisa ser Serializable e eu acho que li em algum lugar EDIT: está bem aí implícito na documentação: link que é "enviado" para os trabalhadores para execução. (por exemplo, Akka enviando uma "parte executável do código" pela rede para os trabalhadores executarem)

É assim que funciona?

Alguém em um encontro que participei comentou e disse que não está realmente enviando nenhum código serializado, mas como cada trabalhador obtém uma "cópia" do jar de qualquer forma, ele só precisa de uma referência para qual função executar ou algo assim (mas não tenho certeza se cito essa pessoa corretamente)

Agora estou com uma confusão total sobre como isso realmente funciona.

Então minhas perguntas são

  1. como os fechamentos de transformação são enviados aos trabalhadores? Serializado via akka? ou eles "já estão lá" porque faíscas enviam o jarro inteiro para cada trabalhador (parece improvável para mim ...)

  2. se sim, então como o resto do jarro é enviado para os trabalhadores? é isso que o "cleanupClosure" está fazendo? por exemplo. enviando apenas o bytecode relevante para o trabalhador em vez de todo o uberjar? (por exemplo, apenas código dependente para o fechamento?)

  3. Então, para resumir, o Spark, a qualquer momento, sincroniza os jars no caminho de classe --jars com os trabalhadores de alguma forma? ou envia "a quantidade certa" de código para os trabalhadores? e se enviar fechamentos, eles estão sendo armazenados em cache para a necessidade de recálculo? ou envia o encerramento com a tarefa toda vez que uma tarefa é agendada? desculpe se isso é bobagem, mas eu realmente não sei.

Por favor, adicione fontes se você puder para a sua resposta, eu não poderia encontrá-lo explícito na documentação, e eu sou muito cauteloso para tentar concluí-lo apenas lendo o código.

    
por Eran Medan 14.08.2015 в 19:22
fonte

1 resposta

2

Os fechamentos são certamente serializados em tempo de execução. Eu tenho muitas ocorrências vistas exceções do Closure Not Serializable em tempo de execução - do pyspark e do scala. Existe um código complexo chamado

De ClosureCleaner.scala

def clean(
    closure: AnyRef,
    checkSerializable: Boolean = true,
    cleanTransitively: Boolean = true): Unit = {
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

que tenta minimizar o código que está sendo serializado. O código é então enviado através do fio - se fosse serializável. Caso contrário, uma exceção será lançada.

Aqui está outro trecho do ClosureCleaner para verificar a capacidade de serializar uma função de entrada:

  private def ensureSerializable(func: AnyRef) {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }
    
por javadba 15.08.2015 / 03:31
fonte