1 module frpd.stream.join; 2 3 import frpd.stream.stream : Stream, StreamListener; 4 import frpd.stream._accessor : Accessor, AccessorOwner; 5 import std.typecons:tuple,Tuple; 6 import std.traits : Parameters, ReturnType, ForeachType; 7 import std.meta : staticMap; 8 9 /** Join multiple streams into one. 10 */ 11 template join(alias f) {// TODO: better error reporting is f is not of the right type. 12 private { 13 alias F = typeof(f); 14 alias T = ForeachType!(ReturnType!F); 15 alias ArrayParams = Parameters!F; 16 alias Params = staticMap!(ForeachType,ArrayParams); 17 static assert(Params.length>1, "Stream join function must 2 or more arguments. Use map for 1 argument."); 18 alias StreamParams = staticMap!(Stream,Params); 19 20 class JoinStream : Stream!T, AccessorOwner { 21 //---Values 22 Tuple!(staticMap!(Accessor,Params)) accessorArgs; // The Cells from which to extract values from when recalculating. 23 T[] delegate(ArrayParams) func; // The function to call with the extracted values. 24 25 //---Constructor 26 this( Tuple!StreamParams streamArgs, 27 T[] delegate(ArrayParams) func, 28 ){ 29 foreach (i,arg;streamArgs) { 30 accessorArgs[i] = new Accessor!(Params[i])(arg, this); 31 } 32 this.func = func; 33 } 34 ~this() { 35 foreach (a;accessorArgs) { 36 destroy(a); 37 } 38 } 39 40 //---Methods 41 //---Listener methods 42 override void onEventsComming() { 43 super.onEventsComming; 44 } 45 override void push() { 46 bool ready = true; 47 foreach(a; accessorArgs) { 48 if (a.eventsComming) 49 ready = false; 50 } 51 if (ready) { 52 Tuple!ArrayParams args; 53 foreach(i,accessor;accessorArgs) { 54 args[i] = accessor.takeEvents; 55 } 56 super.push(func(args.expand)); 57 } 58 } 59 60 } 61 62 template Array(T) { 63 alias Array = T[]; 64 } 65 } 66 67 Stream!T join (StreamParams s) { 68 return new JoinStream(s.tuple, (ArrayParams es){return f(es);}); 69 }; 70 } 71 72 73 unittest { 74 import frpd.stream.sink_stream : stream; 75 import frpd.stream.listener; 76 77 //--- 78 auto a = stream!int; 79 auto b = stream!int; 80 alias joiner = join!((int[] eas, int[] ebs)=>eas~ebs); 81 auto c = joiner(a,b); 82 ////auto b = a.map!((int e)=>twice(e)); 83 { 84 import frpd.stream.stream : Stream; 85 assert(is(typeof(c)==Stream!int)); 86 } 87 88 int[] cs = []; 89 c.addListener!((int e){cs~=e;}); 90 91 assert(cs==[]); 92 a.put(1); 93 assert(cs==[1]); 94 b.put(2); 95 assert(cs==[1,2]); 96 b.bufferPut(4); 97 a.bufferPut(3); 98 b.push; 99 a.push; 100 assert(cs==[1,2,3,4]); 101 } 102 103 104