Storm-源码分析-Topology Submit-Task

mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程 
所以其核心其实就是mk-task-data, 
1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息. 
2. 创建task-object, spout-object或bolt-object, 封装相应的逻辑, 如nextTuple, execute 
3. 生成tasks-fn, 名字起的不好,让人误解执行了task的功能, 其实就是做些emit之间的准备工作, 其中最重要的就是调用grouper去产生targets task, 当然还包含些metrics, hooks的调用.

说白了其实mk-tasks, 没做啥事

(defn mk-task [executor-data task-id]
  (let [task-data (mk-task-data executor-data task-id)  ;;1 mk-task-data
        storm-conf (:storm-conf executor-data)]
    (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] ;; add预定义的hooks
      (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
    ;; when this is called, the threads for the executor haven't been started yet,
    ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
    (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])  ;;向SYSTEM-STREAM, 发送startup通知,谁会接收SYSTEM-STREAM…?
    task-data
    ))

 

1 mk-task-data

(defn mk-task-data [executor-data task-id]
  (recursive-map
    :executor-data executor-data
    :task-id task-id
    :system-context (system-topology-context (:worker executor-data) executor-data task-id)
    :user-context (user-topology-context (:worker executor-data) executor-data task-id)
    :builtin-metrics (builtin-metrics/make-data (:type executor-data))
    :tasks-fn (mk-tasks-fn <>)
    :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))

1.1 TopologyContext

Storm-源码分析-Topology Submit-Task-TopologyContext

:system-context, :user-context, 只是context中的topology对象不同, system为system-topology!

1.2 builtin-metrics/make-data

这里的builtin-metrics用来记录spout或bolt的执行状况的metrics

Storm-源码分析- metric

1.3 mk-tasks-fn

返回tasks-fn, 这个函数主要用于做emit之前的准备工作, 返回target tasks list 
1. 调用grouper, 产生target tasks 
2. 执行emit hook 
3. 满足sampler条件时, 更新stats和buildin-metrics

task-fn, 两种不同参数版本

[^String stream ^List values], 这个版本好理解些, 就是将stream对应的component的target tasks都算上(一个stream可能有多个out component, 一份数据需要发到多个bolt处理)

[^Integer out-task-id ^String stream ^List values], 指定out-task-id, 即direct grouping 
这里对out-task-id做了验证 
out-task-id (if grouping out-task-id), 即out-task-id->component->grouper不为nil(为:direct?), 即验证这个stream确实有到该out-task-id对应component 
如果验证失败, 将out-task-id置nil

(defn mk-tasks-fn [task-data]
  (let [task-id (:task-id task-data)
        executor-data (:executor-data task-data)
        component-id (:component-id executor-data)
        ^WorkerTopologyContext worker-context (:worker-context executor-data)
        storm-conf (:storm-conf executor-data)
        emit-sampler (mk-stats-sampler storm-conf)
        stream->component->grouper (:stream->component->grouper executor-data) ;;Storm-源码分析-Streaming Grouping
        user-context (:user-context task-data)
        executor-stats (:stats executor-data)
        debug? (= true (storm-conf TOPOLOGY-DEBUG))]

    (fn ([^Integer out-task-id ^String stream ^List values]
          (when debug?
            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
          (let [target-component (.getComponentId worker-context out-task-id)
                component->grouping (get stream->component->grouper stream)
                grouping (get component->grouping target-component)
                out-task-id (if grouping out-task-id)]
            (when (and (not-nil? grouping) (not= :direct grouping))
              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
            (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
            (when (emit-sampler)
              (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
              (stats/emitted-tuple! executor-stats stream)
              (if out-task-id
                (stats/transferred-tuples! executor-stats stream 1)
                (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
            (if out-task-id [out-task-id])
            ))
        ([^String stream ^List values]
           (when debug?
             (log-message "Emitting: " component-id " " stream " " values))
           (let [out-tasks (ArrayList.)]
             (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
               (when (= :direct grouper)
                  ;;  TODO: this is wrong, need to check how the stream was declared
                  (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
               (let [comp-tasks (grouper task-id values)] ;;执行grouper, 产生target tasks
                 (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))
                   (.addAll out-tasks comp-tasks)
                   (.add out-tasks comp-tasks)
                   )))
             (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) ;;执行事先注册的emit hook
             (when (emit-sampler)    ;;满足抽样条件时, 更新stats和buildin-metrics中的emitted和transferred metric
               (stats/emitted-tuple! executor-stats stream)
               (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
               (stats/transferred-tuples! executor-stats stream (count out-tasks))
               (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))
             out-tasks)))
    ))

1.4 get-task-object

取出component的对象, 
比如对于Spout, 取出SpoutSpec中的ComponentObject spout_object, 包含了spout的逻辑, 比如nextTuple()

(defn- get-task-object [^TopologyContext topology component-id]
  (let [spouts (.get_spouts topology)
        bolts (.get_bolts topology)
        state-spouts (.get_state_spouts topology)
        obj (Utils/getSetComponentObject
             (cond
              (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
              (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
              (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
              true (throw-runtime "Could not find " component-id " in " topology)))
        obj (if (instance? ShellComponent obj)
              (if (contains? spouts component-id)
                (ShellSpout. obj)
                (ShellBolt. obj))
              obj )
        obj (if (instance? JavaObject obj)
              (thrift/instantiate-java-object obj)
              obj )]
    obj
    ))

本文章摘自博客园,原文发布日期:2013-07-31
时间: 2024-10-23 06:14:35

Storm-源码分析-Topology Submit-Task的相关文章

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之八:Task运行(二)

        在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤:         Step1:Task及其运行时需要的辅助对象构造,主要包括:                        1.当前线程设置上下文类加载器:        

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析之九:内存管理模型

        Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Spark内存管理模型的神秘面纱.         我们在<Spark源码分析之七:Task运行(一)>一文中曾经提到过,在Task被传递到Executor上去执行时,在为其分配的TaskRunner线程的run()方法内,在Task真正运行之前,我们就要构造一个任务内存管理器Task

Spark 源码分析 -- task实际执行过程

Spark源码分析 – SparkContext 中的例子, 只分析到sc.runJob 那么最终是怎么执行的? 通过DAGScheduler切分成Stage, 封装成taskset, 提交给TaskScheduler, 然后等待调度, 最终到Executor上执行 val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains(

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus"来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但是clojure是基于JVM的, 所以在最终发布的时候会产生nimbus.class, 所以在用户使用的

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

线程池源码分析-FutureTask

1 系列目录 线程池接口分析以及FutureTask设计实现 线程池源码分析-ThreadPoolExecutor 该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPoolExecutor,希望能粗略的描述出线程池的各个实现细节.针对JDK1.7中的线程池 2 Executor接口说明 Executor执行器,就是执行一个Runnable任务,可同步可异步,接口定义如下: public interface Executor { /** * Executes the g