diff --git a/Gemfile.lock b/Gemfile.lock index feaa583..124deed 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -7,6 +7,7 @@ PATH GEM remote: https://rubygems.org/ specs: + coderay (1.1.0) colorize (0.5.8) coveralls (0.6.7) colorize @@ -15,8 +16,15 @@ GEM simplecov (>= 0.7) thor diff-lcs (1.2.4) + ffi (1.9.8-java) + method_source (0.8.2) mime-types (1.23) multi_json (1.7.7) + pry (0.10.1-java) + coderay (~> 1.1.0) + method_source (~> 0.8.1) + slop (~> 3.4) + spoon (~> 0.0) rake (10.0.4) redis (3.0.4) rest-client (1.6.7) @@ -33,6 +41,9 @@ GEM multi_json (~> 1.0) simplecov-html (~> 0.7.1) simplecov-html (0.7.1) + slop (3.6.0) + spoon (0.0.4) + ffi thor (0.18.1) PLATFORMS @@ -40,6 +51,7 @@ PLATFORMS DEPENDENCIES coveralls + pry redis redstorm! rspec (~> 2.13) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index c3895ce..a0c6e9b 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' java_import 'backtype.storm.tuple.Fields' @@ -14,6 +15,8 @@ class BoltError < StandardError; end class Bolt attr_reader :collector, :context, :config + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end # DSL class methods @@ -22,10 +25,6 @@ def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end @@ -62,10 +61,18 @@ def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end + def unanchored_stream_emit(stream, *values) + @collector.emit_tuple_stream(stream, Values.new(*values)) + end + def anchored_emit(tuple, *values) @collector.emit_anchor_tuple(tuple, Values.new(*values)) end + def anchored_stream_emit(stream, tuple, *values) + @collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values)) + end + def ack(tuple) @collector.ack(tuple) end @@ -80,7 +87,21 @@ def execute(tuple) output = on_receive(tuple) if output && self.class.emit? values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output - values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} + values_list.each do |values| + if self.class.anchor? + if self.class.stream? + anchored_stream_emit(self.stream, tuple, *values) + else + anchored_emit(tuple, *values) + end + else + if self.class.stream? + unanchored_stream_emit(self.stream, *values) + else + unanchored_emit(*values) + end + end + end @collector.ack(tuple) if self.class.ack? end end @@ -97,10 +118,6 @@ def cleanup on_close end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) @@ -113,10 +130,6 @@ def get_component_configuration def on_init; end def on_close; end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end diff --git a/lib/red_storm/dsl/output_collector.rb b/lib/red_storm/dsl/output_collector.rb index d7a56e2..1cca8d1 100644 --- a/lib/red_storm/dsl/output_collector.rb +++ b/lib/red_storm/dsl/output_collector.rb @@ -6,4 +6,13 @@ class OutputCollector java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")] java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")] + java_alias :emit_tuple_stream, :emit, [ + java.lang.String, + java.lang.Class.for_name("java.util.List") + ] + java_alias :emit_anchor_tuple_stream, :emit, [ + java.lang.String, + Tuple.java_class, + java.lang.Class.for_name("java.util.List") + ] end diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb new file mode 100644 index 0000000..fb09d9e --- /dev/null +++ b/lib/red_storm/dsl/output_fields.rb @@ -0,0 +1,48 @@ +module RedStorm + module DSL + module OutputFields + + def self.included(base) + base.extend ClassMethods + end + + def declare_output_fields(declarer) + self.class.fields.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + end + + def stream + self.class.stream + end + + module ClassMethods + + def output_fields(*fields) + @output_fields ||= Hash.new([]) + fields.each do |field| + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } + else + @output_fields['default'] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] + end + end + end + + def fields + @output_fields ||= Hash.new([]) + end + + def stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def stream + self.receive_options[:stream] + end + end + end + end +end + diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index 4846742..14ab5e8 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' module RedStorm @@ -11,6 +12,8 @@ class SpoutError < StandardError; end class Spout attr_reader :config, :context, :collector + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end # DSL class methods @@ -23,10 +26,6 @@ def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - def self.on_send(*args, &on_send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first @@ -126,10 +125,6 @@ def deactivate on_deactivate end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - def ack(msg_id) on_ack(msg_id) end @@ -154,10 +149,6 @@ def on_deactivate; end def on_ack(msg_id); end def on_fail(msg_id); end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index 121bca2..f259522 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -4,6 +4,7 @@ java_import 'backtype.storm.topology.TopologyBuilder' java_import 'backtype.storm.generated.SubmitOptions' +java_import 'backtype.storm.utils.Utils' module RedStorm module DSL @@ -26,16 +27,45 @@ def initialize(component_class, constructor_args, id, parallelism) @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = [] + @output_fields = Hash.new([]) + + initialize_output_fields end - def output_fields(*args) - args.empty? ? @output_fields : @output_fields = args.map(&:to_s) + def output_fields(*fields) + default_fields = [] + fields.each do |field| + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } + else + default_fields |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] + end + end + @output_fields[Utils::DEFAULT_STREAM_ID] = default_fields unless default_fields.empty? + + @output_fields end def is_java? @clazz.name.split('::').first.downcase == 'java' end + + private + + def initialize_output_fields + if @clazz.ancestors.include?(RedStorm::DSL::OutputFields) + @output_fields = @clazz.fields.clone + end + end + + def java_safe_fields + java_hash = java.util.HashMap.new() + @output_fields.each do |k, v| + java_hash.put(k, v.to_java('java.lang.String')) unless v.empty? + end + java_hash + end end class SpoutDefinition < ComponentDefinition @@ -47,7 +77,7 @@ def new_instance elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end @@ -60,29 +90,33 @@ def initialize(*args) @sources = [] end - def source(source_id, grouping) - @sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}] + def source(source_id, grouping, stream = Utils::DEFAULT_STREAM_ID) + @sources << [ + source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, + grouping.is_a?(Hash) ? grouping : {grouping => nil}, + stream.to_s + ] end def define_grouping(declarer) - @sources.each do |source_id, grouping| + @sources.each do |source_id, grouping, stream| grouper, params = grouping.first # declarer.fieldsGrouping(source_id, Fields.new()) case grouper when :fields - declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s)))) + declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s)))) when :global - declarer.globalGrouping(source_id) + declarer.globalGrouping(source_id, stream) when :shuffle - declarer.shuffleGrouping(source_id) + declarer.shuffleGrouping(source_id, stream) when :local_or_shuffle - declarer.localOrShuffleGrouping(source_id) + declarer.localOrShuffleGrouping(source_id, stream) when :none - declarer.noneGrouping(source_id) + declarer.noneGrouping(source_id, stream) when :all - declarer.allGrouping(source_id) + declarer.allGrouping(source_id, stream) when :direct - declarer.directGrouping(source_id) + declarer.directGrouping(source_id, stream) else raise("unknown grouper=#{grouper.inspect}") end @@ -96,7 +130,7 @@ def new_instance elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end diff --git a/redstorm.gemspec b/redstorm.gemspec index fe760b3..7c4ef68 100644 --- a/redstorm.gemspec +++ b/redstorm.gemspec @@ -21,5 +21,6 @@ Gem::Specification.new do |s| s.executables = ['redstorm'] s.add_development_dependency 'rspec', '~> 2.13' + s.add_development_dependency 'pry' s.add_runtime_dependency 'rake' end diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 52b05a1..fc7ada6 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -45,21 +45,42 @@ class Bolt1 < RedStorm::SimpleBolt output_fields :f1 end bolt = Bolt1.new - Bolt1.send(:fields).should == ["f1"] + Bolt1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :f2 end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, "f2" end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} + end + + it "should parse single hash argument" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => :f1 + end + Bolt1.send(:fields).should == {"stream" => ["f1"]} + end + + it "should parse hash of string and symbols" do + class Bolt1 < RedStorm::SimpleBolt + output_fields "stream" => [:f1, :f2] + end + Bolt1.send(:fields).should == {"stream" => ["f1", "f2"]} + end + + it "should parse string and hash arguments" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :stream => :f2 + end + Bolt1.send(:fields).should == {"default" => ["f1"], "stream" => ["f2"]} end it "should not share state over mutiple classes" do @@ -69,9 +90,9 @@ class Bolt1 < RedStorm::SimpleBolt class Bolt2 < RedStorm::SimpleBolt output_fields :f2 end - RedStorm::SimpleBolt.send(:fields).should == [] - Bolt1.send(:fields).should == ["f1"] - Bolt2.send(:fields).should == ["f2"] + RedStorm::SimpleBolt.send(:fields).should == {} + Bolt1.send(:fields).should == {"default" => ["f1"]} + Bolt2.send(:fields).should == {"default" => ["f2"]} end end @@ -115,6 +136,7 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -147,16 +169,27 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => "test" do + end + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" do end end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -166,13 +199,13 @@ class Bolt1 < RedStorm::SimpleBolt class Bolt1 < RedStorm::SimpleBolt def test_method; end on_receive :test_method - end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -186,8 +219,7 @@ class Bolt1 < RedStorm::SimpleBolt it "should parse :ack option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :ack => true do - end + on_receive :test_method, :ack => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) @@ -196,24 +228,32 @@ class Bolt1 < RedStorm::SimpleBolt it "should parse :anchor option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :anchor => true do - end + on_receive :test_method, :anchor => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :test_method, :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do - end + on_receive :test_method, :emit => false, :ack =>true, :anchor => true, :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -227,6 +267,7 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -256,15 +297,25 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end end @@ -603,6 +654,74 @@ def on_receive(tuple) bolt.prepare(nil, nil, collector) bolt.execute("output") end + + it "should emit tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream do |tuple| + tuple + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :stream => :custom_stream + def my_method(tuple); tuple; end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream + def on_receive(tuple); tuple; end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_tuple_stream).with(:custom_stream, "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + end + + it "should emit anchored tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream do |tuple| + "output" + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :anchor => true, :stream => :custom_stream + def my_method(tuple) + "output" + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream + def on_receive(tuple) + "output" + end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_anchor_tuple_stream).with(:custom_stream, "tuple", "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + end end describe "prepare" do @@ -685,10 +804,36 @@ class Bolt1 < RedStorm::SimpleBolt bolt = Bolt1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") bolt.declare_output_fields(declarer) end + + it "should declare stream with fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => [:f1, :f2] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") + bolt.declare_output_fields(declarer) + end + + it "should declare default stream fields and custom stream fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :f2, :stream => [:f3, :f4] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "stream_fields") + declarer.should_receive(:declareStream).with("default", "default_fields") + RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields") + bolt.declare_output_fields(declarer) + end end describe "get_component_configuration" do @@ -701,4 +846,4 @@ class Bolt1 < RedStorm::SimpleBolt; end end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/output_collector_spec.rb b/spec/red_storm/dsl/output_collector_spec.rb index d604337..82ddeb7 100644 --- a/spec/red_storm/dsl/output_collector_spec.rb +++ b/spec/red_storm/dsl/output_collector_spec.rb @@ -10,5 +10,11 @@ # We should have an alias for #emit_anchor_tuple it { should respond_to :emit_anchor_tuple } + + # We should have an alias for #emit_tuple_stream + it { should respond_to :emit_tuple_stream } + + # We should have an alias for #emit_anchor_tuple_stream + it { should respond_to :emit_anchor_tuple_stream } end end diff --git a/spec/red_storm/dsl/spout_spec.rb b/spec/red_storm/dsl/spout_spec.rb index ece2a8d..e04d677 100644 --- a/spec/red_storm/dsl/spout_spec.rb +++ b/spec/red_storm/dsl/spout_spec.rb @@ -65,21 +65,21 @@ class Spout1 < RedStorm::SimpleSpout output_fields :f1 end - Spout1.send(:fields).should == ["f1"] + Spout1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, :f2 end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, "f2" end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should not share state over mutiple classes" do @@ -89,9 +89,9 @@ class Spout1 < RedStorm::SimpleSpout class Spout2 < RedStorm::SimpleSpout output_fields :f2 end - RedStorm::SimpleSpout.send(:fields).should == [] - Spout1.send(:fields).should == ["f1"] - Spout2.send(:fields).should == ["f2"] + RedStorm::SimpleSpout.send(:fields).should == {} + Spout1.send(:fields).should == {"default" => ["f1"]} + Spout2.send(:fields).should == {"default" => ["f2"]} end end @@ -787,7 +787,7 @@ class Spout1 < RedStorm::SimpleSpout spout = Spout1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") spout.declare_output_fields(declarer) end @@ -879,4 +879,4 @@ class Spout1 < RedStorm::SimpleSpout; end end end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 9500679..1c24b25 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,5 +1,7 @@ require 'spec_helper' require 'red_storm/dsl/topology' +require 'red_storm/dsl/spout' +require 'red_storm/dsl/bolt' describe RedStorm::SimpleTopology do @@ -21,10 +23,10 @@ class RedStorm::Fields; end Object.send(:remove_const, "SpoutClass2") if Object.const_defined?("SpoutClass2") Object.send(:remove_const, "BoltClass1") if Object.const_defined?("BoltClass1") Object.send(:remove_const, "BoltClass2") if Object.const_defined?("BoltClass2") - class SpoutClass1; end - class SpoutClass2; end - class BoltClass1; end - class BoltClass2; end + class SpoutClass1 < RedStorm::DSL::Spout; end + class SpoutClass2 < RedStorm::DSL::Spout; end + class BoltClass1 < RedStorm::DSL::Bolt; end + class BoltClass2 < RedStorm::DSL::Bolt; end SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass1.should_receive(:java_proxy).at_least(0).times.and_return("RedStorm::JRubySpout") @@ -112,8 +114,30 @@ class Topology1 < RedStorm::SimpleTopology output_fields :f3 end end - Topology1.spouts.first.output_fields.should == ["f1", "f2"] - Topology1.spouts.last.output_fields.should == [ "f3"] + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } + end + + it "should default output_fields to the class defined fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 + end + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 do + output_fields :f3, :f4 + end + end + Topology1.spouts.first.output_fields.should == { "default" => ["f3", "f4"] } end end @@ -195,8 +219,31 @@ class Topology1 < RedStorm::SimpleTopology output_fields :f3 end end - Topology1.bolts.first.output_fields.should == ["f1", "f2"] - Topology1.bolts.last.output_fields.should == [ "f3"] + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } + end + + it "should default output_fields to the class defined fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + output_fields :f3, :f4 + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f3", "f4"] } end end @@ -307,8 +354,8 @@ class Topology1 < RedStorm::SimpleTopology RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2) builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer) builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer) @@ -340,12 +387,13 @@ class Topology1 < RedStorm::SimpleTopology configurator = mock(RedStorm::Configurator) jruby_bolt1 = mock(RedStorm::JRubyBolt) jruby_bolt2 = mock(RedStorm::JRubyBolt) + jruby_bolt3 = mock(RedStorm::JRubyBolt) declarer = mock("Declarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2) builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) @@ -376,8 +424,8 @@ class Topology1 < RedStorm::SimpleTopology backtype_config = mock(Backtype::Config) Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config) backtype_config.should_receive(:put) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout) builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer) builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer) @declarer.should_receive("addConfigurations").twice @@ -394,7 +442,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single string fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => "f1" }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -407,7 +468,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single symbolic fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => :s1 }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -420,7 +494,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support string array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => ["f1", "f2"] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -433,7 +520,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support symbolic array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => [:s1, :s2] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -445,7 +545,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("shuffleGrouping").with('1') + @declarer.should_receive("shuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :shuffle, 'custom_stream' + end + end + + @declarer.should_receive("shuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -457,7 +569,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("localOrShuffleGrouping").with('1') + @declarer.should_receive("localOrShuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support local_or_shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :local_or_shuffle, 'custom_stream' + end + end + + @declarer.should_receive("localOrShuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -469,7 +593,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("noneGrouping").with('1') + @declarer.should_receive("noneGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support none" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :none, 'custom_stream' + end + end + + @declarer.should_receive("noneGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -481,7 +617,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("globalGrouping").with('1') + @declarer.should_receive("globalGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support global with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :global, 'custom_stream' + end + end + + @declarer.should_receive("globalGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -493,7 +641,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("allGrouping").with('1') + @declarer.should_receive("allGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support all with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :all, 'custom_stream' + end + end + + @declarer.should_receive("allGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -505,7 +665,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("directGrouping").with('1') + @declarer.should_receive("directGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support direct with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :direct, 'custom_stream' + end + end + + @declarer.should_receive("directGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end end @@ -563,7 +735,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == '1' Topology1.bolts.first.id.should == '2' - Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}, 'default'] end it "should support explicit string ids" do @@ -577,7 +749,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "id1" Topology1.bolts.first.id.should == "id2" - Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}, 'default'] end it "should support implicit string ids" do @@ -591,7 +763,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should support implicit symbol ids" do @@ -605,7 +777,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}, 'default'] end it "should support implicit class ids" do @@ -619,7 +791,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should raise on unresolvable" do @@ -633,7 +805,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}, 'default'] lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy" end @@ -651,4 +823,4 @@ class Topology1 < RedStorm::SimpleTopology end end -end \ No newline at end of file +end diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index a4106e5..abbb49f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -6,9 +6,11 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; +import org.jruby.RubyHash; import org.jruby.RubyObject; import org.jruby.runtime.Helpers; import org.jruby.runtime.builtin.IRubyObject; @@ -27,7 +29,7 @@ */ public class JRubyBolt implements IRichBolt { private final String _realBoltClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt { * @param realBoltClassName the fully qualified JRuby bolt implementation class name * @param fields the output fields names */ - public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) { + public JRubyBolt(String baseClassPath, String realBoltClassName, Map fields) { _realBoltClassName = realBoltClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -72,8 +74,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index b30d292..a6d81f2 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -6,6 +6,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; @@ -27,7 +28,7 @@ */ public class JRubySpout implements IRichSpout { private final String _realSpoutClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout { * @param realSpoutClassName the fully qualified JRuby spout implementation class name * @param fields the output fields names */ - public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) { + public JRubySpout(String baseClassPath, String realSpoutClassName, Map fields) { _realSpoutClassName = realSpoutClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -93,8 +94,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp spout instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_spout = initialize_ruby_spout(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);