1 module frpd.stream.join;
2 
3 import frpd.stream.stream : Stream, StreamListener;
4 import frpd.stream._accessor : Accessor, AccessorOwner;
5 import std.typecons:tuple,Tuple;
6 import std.traits : Parameters, ReturnType, ForeachType;
7 import std.meta : staticMap;
8 
9 /**	Join multiple streams into one.
10 */
11 template join(alias f) {// TODO: better error reporting is f is not of the right type.
12 	private {
13 		alias F = typeof(f);
14 		alias T = ForeachType!(ReturnType!F);
15 		alias ArrayParams = Parameters!F;
16 		alias Params = staticMap!(ForeachType,ArrayParams);
17 		static assert(Params.length>1, "Stream join function must 2 or more arguments.  Use map for 1 argument.");
18 		alias StreamParams = staticMap!(Stream,Params);
19 		
20 		class JoinStream : Stream!T, AccessorOwner {
21 			//---Values
22 			Tuple!(staticMap!(Accessor,Params)) accessorArgs;	// The Cells from which to extract values from when recalculating.
23 			T[] delegate(ArrayParams) func;	// The function to call with the extracted values.
24 			
25 			//---Constructor
26 			this(	Tuple!StreamParams streamArgs,
27 				T[] delegate(ArrayParams) func,
28 			){
29 				foreach (i,arg;streamArgs) {
30 					accessorArgs[i] = new Accessor!(Params[i])(arg, this);
31 				}
32 				this.func = func;
33 			}
34 			~this() {
35 				foreach (a;accessorArgs) {
36 					destroy(a);
37 				}
38 			}
39 			
40 			//---Methods
41 			//---Listener methods
42 			override void onEventsComming() {
43 				super.onEventsComming;
44 			}
45 			override void push() {
46 				bool ready = true;
47 				foreach(a; accessorArgs) {
48 					if (a.eventsComming)
49 						ready = false;
50 				}
51 				if (ready) {
52 					Tuple!ArrayParams args;
53 					foreach(i,accessor;accessorArgs) {
54 						args[i] = accessor.takeEvents;
55 					}
56 					super.push(func(args.expand));
57 				}
58 			}
59 			
60 		}
61 		
62 		template Array(T) {
63 			alias Array = T[];
64 		}
65 	}
66 	
67 	Stream!T join (StreamParams s) {
68 		return new JoinStream(s.tuple, (ArrayParams es){return f(es);});
69 	};
70 }
71 
72 
73 unittest {
74 	import frpd.stream.sink_stream : stream;
75 	import frpd.stream.listener;
76 	
77 	//---
78 	auto a = stream!int;
79 	auto b = stream!int;
80 	alias joiner = join!((int[] eas, int[] ebs)=>eas~ebs);
81 	auto c = joiner(a,b);
82 	////auto b = a.map!((int e)=>twice(e));
83 	{
84 		import frpd.stream.stream : Stream;
85 		assert(is(typeof(c)==Stream!int));
86 	}
87 	
88 	int[] cs = [];
89 	c.addListener!((int e){cs~=e;});
90 	
91 	assert(cs==[]);
92 	a.put(1);
93 	assert(cs==[1]);
94 	b.put(2);
95 	assert(cs==[1,2]);
96 	b.bufferPut(4);
97 	a.bufferPut(3);
98 	b.push;
99 	a.push;
100 	assert(cs==[1,2,3,4]);
101 }
102 
103 
104