diff --git a/Examples/DifferentialDataflow/Netflix.cs b/Examples/DifferentialDataflow/Netflix.cs new file mode 100644 index 0000000..986613f --- /dev/null +++ b/Examples/DifferentialDataflow/Netflix.cs @@ -0,0 +1,222 @@ +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; + +namespace Microsoft.Research.Naiad.Examples.DifferentialDataflow { + + public class Netflix : Example { + + int movieSelect = 1970; + + public struct IntTriple : IEquatable { + public int first; + public int second; + public int third; + + public bool Equals(IntTriple that) { + return first == that.first && second == that.second && third == that.third; + } + + public int CompareTo(IntTriple that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third; + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntTriple(int x, int y, int z) + { + first = x; second = y; third = z; + } + } + + public struct Movie : IEquatable { + public int id; + public int year; + public string title; + + public bool Equals(Movie that) { + return id == that.id && year == that.year && title.Equals(that.title); + } + + public int CompareTo(Movie that) { + if (id != that.id) + return id - that.id; + if (year != that.year) + return year - that.year; + return title.CompareTo(that.title); + } + + public override int GetHashCode() { + return 31 * id + 1234347 * year + 4311 * title.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", id, year, title); + } + + public Movie(int x, int y, string z) { + id = x; year = y; title = z; + } + + } + + public struct Rating : IEquatable { + public int movId; + public int userId; + public int rating; + public string date; + + public bool Equals(Rating that) { + return movId == that.movId && userId == that.userId && + rating == that.rating && date.Equals(that.date); + } + + public int CompareTo(Rating that) { + if (movId != that.movId) + return movId - that.movId; + if (userId != that.userId) + return userId - that.userId; + if (rating != that.rating) + return rating - that.rating; + return date.CompareTo(that.date); + } + + public override int GetHashCode() { + return 31 * movId + 1234347 * userId + 4311 * rating + 12315 * + date.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3}", movId, userId, rating, + date); + } + + public Rating(int x, int y, int z, string w) { + movId = x; + userId = y; + rating = z; + date = w; + } + + } + + public IEnumerable SumRatings(IntPair movIds, + IEnumerable ratings) { + int allRatings = 0; + foreach (var rating in ratings) + allRatings += rating; + yield return new IntTriple(movIds.s, movIds.t, allRatings); + } + + public IEnumerable MaxRating(int id, IEnumerable ratings) { + int maxRating = 0; + foreach (var rating in ratings) + maxRating = Math.Max(maxRating, rating); + yield return new IntPair(id, maxRating); + } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var movies = computation.NewInputCollection(); + var ratings = computation.NewInputCollection(); + + var ratingSel = movies.Where((Movie x) => x.year < 1970) + .Join(ratings, + (Movie movie) => movie.id, + (Rating rating) => rating.movId, + (Movie movie, Rating rating) => + new IntTriple(rating.movId, rating.userId, rating.rating)); + + var ratingJoin = ratingSel.Join(ratingSel, + (IntTriple r1) => r1.second, + (IntTriple r2) => r2.second, + (IntTriple r1, IntTriple r2) => + new IntTriple(r1.first, // movId + r1.third * r2.third, // rating * rating + r2.first)); // movId + + var intResult = ratingJoin.GroupBy(row => new IntPair(row.first, row.third), + row => row.second, // Select rating + (movIds, grouped_ratings) => + SumRatings(movIds, grouped_ratings)); + + var matmultfinalPrj = intResult.Join(ratingSel, + (IntTriple intRes) => intRes.first, // movId + (IntTriple ratSel) => ratSel.first, // movId + (IntTriple intRes, IntTriple ratSel) => + new IntTriple(intRes.first, + intRes.third * ratSel.third, + ratSel.second)); + + var predicted = matmultfinalPrj.GroupBy(row => new IntPair(row.first, row.third), + row => row.second, // Select rating + (movIds, grouped_ratings) => + SumRatings(movIds, grouped_ratings)); + + var prediction = predicted.GroupBy(row => row.second, + row => row.third, + (id, movId) => MaxRating(id, movId)) + .Subscribe(l => { foreach (var element in l) Console.WriteLine(element); }); + + computation.Activate(); + var inputFile = args[1]; + if (computation.Configuration.ProcessID == 0) { + StreamReader reader = File.OpenText(inputFile); + + for (var i = 0; !reader.EndOfStream; i++) { + var elements = reader.ReadLine().Split(' '); + if (elements.Length == 3) { + movies.OnNext(new Movie(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1]), + elements[2])); + ratings.OnNext(); + computation.Sync(i); + } + if (elements.Length == 4) { + movies.OnNext(); + ratings.OnNext(new Rating(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[2]), + elements[3])); + computation.Sync(i); + } + } + } + + movies.OnCompleted(); + ratings.OnCompleted(); + + computation.Join(); + } + } + + public string Usage { get { return ""; } } + + public string Help { + get { + return "Netflix "; + } + } + } + +} \ No newline at end of file diff --git a/Examples/Examples.csproj b/Examples/Examples.csproj index 0386abc..add485f 100644 --- a/Examples/Examples.csproj +++ b/Examples/Examples.csproj @@ -1,4 +1,4 @@ - + @@ -74,6 +74,7 @@ + @@ -81,8 +82,18 @@ + + + + + + + + + + @@ -116,4 +127,4 @@ --> - \ No newline at end of file + diff --git a/Examples/Naiad/Join.cs b/Examples/Naiad/Join.cs new file mode 100644 index 0000000..5ae1bb4 --- /dev/null +++ b/Examples/Naiad/Join.cs @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.Join +{ + + public class Join : Example + { + public struct IntPair : IEquatable { + public int s; + public int t; + + public bool Equals(IntPair that) { + return s == that.s && t == that.t; + } + + public int CompareTo(IntPair that) { + if (this.s != that.s) + return this.s - that.s; + + return this.t - that.t; + } + + public override int GetHashCode() { + return 47 * s + 36425232 * t; + } + + public override string ToString() { + return String.Format("{0} {1}", s, t); + } + + public IntPair(int ss, int tt) { + s = ss; + t = tt; + } + } + + public struct IntTriple : IEquatable { + public int first; + public int second; + public int third; + + public bool Equals(IntTriple that) { + return first == that.first && second == that.second && third == that.third; + } + + public int CompareTo(IntTriple that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third; + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntTriple(int x, int y, int z) { + first = x; second = y; third = z; + } + } + + public IEnumerable read_rel(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntPair(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1])); + } + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var left_input = new BatchedDataSource(); + var right_input = new BatchedDataSource(); + var left = computation.NewInput(left_input).SelectMany(filename => read_rel(filename)); + var right = computation.NewInput(right_input).SelectMany(filename => read_rel(filename)); + + var output = left.Join(right, + left_row => left_row.t, + right_row => right_row.t, + (left_row, right_row) => new IntTriple(left_row.s, left_row.t, right_row.s)); + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_output = + new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_output[i] = new StreamWriter(args[1] + "/join" + j + ".out"); + } + output.Subscribe((i, l) => { foreach (var element in l) file_output[i - minThreadId].WriteLine(element); }); + computation.Activate(); + left_input.OnCompleted(args[1] + "/join_left" + computation.Configuration.ProcessID + ".in"); + right_input.OnCompleted(args[1] + "/join_right" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_output[i].Close(); + } + } + } + + public string Help { + get { + return "Join "; + } + } + } + +} diff --git a/Examples/Naiad/JoinLJ.cs b/Examples/Naiad/JoinLJ.cs new file mode 100644 index 0000000..ed9a462 --- /dev/null +++ b/Examples/Naiad/JoinLJ.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.JoinLJ +{ + + public class JoinLJ : Example { + + public struct IntDouble : IEquatable { + public int first; + public double second; + + public bool Equals(IntDouble that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntDouble that) { + if (first != that.first) + return first - that.first; + if (second < that.second) { + return -1; + } else if (second > that.second) { + return 1; + } else { + return 0; + } + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntDouble(int x, double y) { + first = x; + second = y; + } + + } + + public struct IntIntDouble : IEquatable { + public int first; + public int second; + public double third; + + public bool Equals(IntIntDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001; + } + + public int CompareTo(IntIntDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + return this.third - that.third < 0 ? -1 : 1; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntDouble(int x, int y, double z) { + first = x; + second = y; + third = z; + } + } + + public IEnumerable read_edges(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntPair(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1])); + } + } + + public IEnumerable read_vertices(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntDouble(Convert.ToInt32(elements[0]), + Convert.ToDouble(elements[1])); + } + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var left_input = new BatchedDataSource(); + var right_input = new BatchedDataSource(); + var left = computation.NewInput(left_input).SelectMany(filename => read_edges(filename)); + var right = computation.NewInput(right_input).SelectMany(filename => read_vertices(filename)); + + var output = left.Join(right, + left_row => left_row.s, + right_row => right_row.first, + (left_row, right_row) => new IntIntDouble(left_row.s, + left_row.t, right_row.second)); + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_output = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_output[i] = new StreamWriter(args[3] + j + ".out"); + } + output.Subscribe((i, l) => { foreach (var element in l) file_output[i - minThreadId].WriteLine(element); }); + computation.Activate(); + left_input.OnCompleted(args[1]); + right_input.OnCompleted(args[2]); + + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_output[i].Close(); + } + } + } + + public string Help { + get { + return "JoinLJ "; + } + } + } + +} diff --git a/Examples/Naiad/KMeans.cs b/Examples/Naiad/KMeans.cs new file mode 100644 index 0000000..80b7ef5 --- /dev/null +++ b/Examples/Naiad/KMeans.cs @@ -0,0 +1,276 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.KMeans +{ + + public class KMeans : Example { + + public struct DoubleDouble : IEquatable, IComparable { + public double first; + public double second; + + public bool Equals(DoubleDouble that) { + return Math.Abs(first - that.first) < 0.0000001 && + Math.Abs(second - that.second) < 0.0000001; + } + + public int CompareTo(DoubleDouble that) { + if (Math.Abs(this.first - that.first) > 0.0000001) + return this.first - that.first < 0 ? -1 : 1; + if (Math.Abs(this.second - that.second) < 0.0000001) + return 0; + return this.second - that.second < 0 ? -1 : 1; + } + + public override int GetHashCode() { + return 47 * first.GetHashCode() + 36425232 * second.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public DoubleDouble(double x, double y) { + first = x; + second = y; + } + + } + + public struct DoubleDoubleDouble : IEquatable { + public double first; + public double second; + public double third; + + public bool Equals(DoubleDoubleDouble that) { + return Math.Abs(first - that.first) < 0.0000001 && + Math.Abs(second - that.second) < 0.0000001 && + Math.Abs(third - that.third) < 0.0000001; + } + + public int CompareTo(DoubleDoubleDouble that) { + if (Math.Abs(this.first - that.first) > 0.0000001) + return this.first - that.first < 0 ? -1 : 1; + if (Math.Abs(this.second - that.second) > 0.0000001) + return this.second - that.second < 0 ? -1 : 1; + if (Math.Abs(this.third - that.third) < 0.0000001) + return 0; + return this.third - that.third < 0 ? -1 : 1; + } + + public override int GetHashCode() { + return 47 * first.GetHashCode() + 36425232 * second.GetHashCode() + 17 * + third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public DoubleDoubleDouble(double x, double y, double z) { + first = x; + second = y; + third = z; + } + + } + + public struct DoubleDoubleDoubleDouble : IEquatable { + public double first; + public double second; + public double third; + public double fourth; + + public bool Equals(DoubleDoubleDoubleDouble that) { + return Math.Abs(first - that.first) < 0.0000001 && + Math.Abs(second - that.second) < 0.0000001 && + Math.Abs(third - that.third) < 0.0000001 && + Math.Abs(fourth - that.fourth) < 0.0000001; + } + + public int CompareTo(DoubleDoubleDoubleDouble that) { + if (Math.Abs(this.first - that.first) > 0.0000001) + return this.first - that.first < 0 ? -1 : 1; + if (Math.Abs(this.second - that.second) > 0.0000001) + return this.second - that.second < 0 ? -1 : 1; + if (Math.Abs(this.third - that.third) > 0.0000001) + return this.third - that.third < 0 ? -1 : 1; + if (Math.Abs(this.fourth - that.fourth) < 0.0000001) + return 0; + return this.fourth - that.fourth < 0 ? -1 : 1; + } + + public override int GetHashCode() { + return 47 * first.GetHashCode() + 36425232 * second.GetHashCode() + 17 * + third.GetHashCode() + fourth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3}", first, second, third, fourth); + } + + public DoubleDoubleDoubleDouble(double x, double y, double z, double xx) { + first = x; + second = y; + third = z; + fourth = xx; + } + + public static bool operator <(DoubleDoubleDoubleDouble l, + DoubleDoubleDoubleDouble r) { + return l.CompareTo(r) < 0; + } + + public static bool operator >(DoubleDoubleDoubleDouble l, + DoubleDoubleDoubleDouble r) { + return l.CompareTo(r) > 0; + } + + } + + public IEnumerable ReadPoints(string filename) { + if (File.Exists(filename)) { + var file = File.OpenText(filename); + while (!file.EndOfStream) { + var elements = file.ReadLine().Split(' '); + yield return new DoubleDouble(Double.Parse(elements[0]), + Double.Parse(elements[1])); + } + } else { + Console.WriteLine("File not found! {0}", filename); + } + } + + public IEnumerable ReadClusters(string filename) { + if (File.Exists(filename)) { + var file = File.OpenText(filename); + while (!file.EndOfStream) { + var elements = file.ReadLine().Split(' '); + yield return new DoubleDoubleDouble(Double.Parse(elements[0]), + Double.Parse(elements[1]), + Double.Parse(elements[2])); + } + } else { + Console.WriteLine("File not found! {0}", filename); + } + } + + public IEnumerable MinClst(DoubleDouble point, + IEnumerable pntclsts) { + double minDist = Double.MaxValue; + DoubleDoubleDoubleDouble minRow = + new DoubleDoubleDoubleDouble(0, 0, 0, 0); + foreach (var pntclst in pntclsts) { + if (pntclst.third < minDist) { + minDist = pntclst.third; + minRow = pntclst; + } + } + yield return new DoubleDoubleDouble(point.first, point.second, minRow.fourth); + } + + public IEnumerable CalcAvg(Double clst_id, + IEnumerable clsts) { + double sumx = 0; + double sumy = 0; + double cnt = 0; + foreach (var clst in clsts) { + sumx += clst.first; + sumy += clst.second; + cnt += 1; + } + yield return new DoubleDoubleDouble(sumx / cnt, sumy / cnt, clst_id); + } + + public Stream KMeansIter( + Stream points_in, + Stream clusters_in) + where T: Time { + var pnt_clsts = points_in.Join(clusters_in, + (DoubleDouble point) => 1, + (DoubleDoubleDouble cluster) => 1, + (DoubleDouble point, DoubleDoubleDouble cluster) => + new DoubleDoubleDoubleDouble(point.first, + point.second, + (cluster.first - point.first) * (cluster.first - point.first) + (cluster.second - point.second) * (cluster.second - point.second), + cluster.third)); + return pnt_clsts.Min(row => new DoubleDouble(row.first, row.second), + row => new DoubleDouble(row.third, row.fourth)) + .Select(row => new DoubleDoubleDouble(row.First.first, row.First.second, row.Second.second)) + .GroupBy(row => row.third, + (clst_id, pnts) => CalcAvg(clst_id, pnts)); + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + + var points = new BatchedDataSource(); + var clusters = new BatchedDataSource(); + + var points_in = computation.NewInput(points).SelectMany(file => ReadPoints(file)); + var clusters_in = computation.NewInput(clusters).SelectMany(file => ReadClusters(file)); + + // var output = clusters_in.IterateAndAccumulate((lc, deltas) => + // deltas.Join(lc.EnterLoop(points_in), + // (DoubleDoubleDouble cluster) => 1, + // (DoubleDouble point) => 1, + // (DoubleDoubleDouble cluster, DoubleDouble point) => + // new DoubleDoubleDoubleDouble(point.first, + // point.second, + // (cluster.first - point.first) * + // (cluster.first - point.first) + + // (cluster.second - point.second) * + // (cluster.second - point.second), + // cluster.third)) + // .GroupBy(row => new DoubleDouble(row.first, row.second), + // (pnts, pntclsts) => MinClst(pnts, pntclsts)), + // .GroupBy(row => row.third, + // (clst_id, pnts) => CalcAvg(clst_id, pnts)), + // null, 5, "KMeans"); + + for (int i = 0; i < 5; ++i) { + clusters_in = KMeansIter(points_in, clusters_in); + } + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_out = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_out[i] = new StreamWriter("/tmp/clusters/clusters" + j + ".out"); + } + + clusters_in.Subscribe((i, l) => { foreach (var element in l) file_out[i - minThreadId].WriteLine(element); }); + computation.Activate(); + points.OnCompleted(args[1]); + clusters.OnCompleted(args[2]); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_out[i].Close(); + } + } + } + + public string Help { + get { + return "KMeans "; + } + } + } + +} diff --git a/Examples/Naiad/Netflix.cs b/Examples/Naiad/Netflix.cs new file mode 100644 index 0000000..702ee08 --- /dev/null +++ b/Examples/Naiad/Netflix.cs @@ -0,0 +1,304 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.Netflix +{ + + public class Netflix : Example { + + public struct LongPair : IEquatable + { + public long s; + public long t; + + public bool Equals(LongPair that) + { + return s == that.s && t == that.t; + } + + public long CompareTo(LongPair that) + { + if (this.s != that.s) + return this.s - that.s; + + return this.t - that.t; + } + + public override int GetHashCode() + { + return 47 * s.GetHashCode() + 36425232 * t.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1}", s, t); + } + + public LongPair(long ss, long tt) + { + s = ss; + t = tt; + } + + } + + public struct LongTriple : IEquatable + { + public long first; + public long second; + public long third; + + public bool Equals(LongTriple that) + { + return first == that.first && second == that.second && third == that.third; + } + + public long CompareTo(LongTriple that) + { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() + { + return first.GetHashCode() + 1234347 * second.GetHashCode() + + 4311 * third.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", first, second, third); + } + + public LongTriple(long x, long y, long z) + { + first = x; second = y; third = z; + } + } + + public struct Movie : IEquatable + { + public long id; + public long year; + public string title; + + public bool Equals(Movie that) + { + return id == that.id && year == that.year && title.Equals(that.title); + } + + public long CompareTo(Movie that) + { + if (id != that.id) + return id - that.id; + if (year != that.year) + return year - that.year; + return title.CompareTo(that.title); + } + + public override int GetHashCode() + { + return 31 * id.GetHashCode() + 1234347 * year.GetHashCode() + + 4311 * title.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", id, year, title); + } + + public Movie(long x, long y, string z) + { + id = x; year = y; title = z; + } + + } + + public struct Rating : IEquatable + { + public long movId; + public long userId; + public long rating; + + public bool Equals(Rating that) + { + return movId == that.movId && userId == that.userId && + rating == that.rating; + } + + public long CompareTo(Rating that) + { + if (movId != that.movId) + return movId - that.movId; + if (userId != that.userId) + return userId - that.userId; + return rating - that.rating; + } + + public override int GetHashCode() + { + return 31 * movId.GetHashCode() + 1234347 * userId.GetHashCode() + + 4311 * rating.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", movId, userId, rating); + } + + public Rating(long x, long y, long z) + { + movId = x; + userId = y; + rating = z; + } + + } + + public IEnumerable SumRatings(LongPair movIds, + IEnumerable ratings) { + long allRatings = 0; + foreach (var rating in ratings) + allRatings += rating.second; + yield return new LongTriple(movIds.s, movIds.t, allRatings); + } + + public IEnumerable MaxRating(long id, IEnumerable ratings) { + long maxRating = 0; + foreach (var rating in ratings) + maxRating = Math.Max(maxRating, rating.third); + yield return new LongPair(id, maxRating); + } + + public IEnumerable ReadRatings(String filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new Rating(Convert.ToInt64(elements[0]), + Convert.ToInt64(elements[1]), + Convert.ToInt64(elements[2])); + } + } + + public IEnumerable ReadMovies(String filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new Movie(Convert.ToInt64(elements[0]), + Convert.ToInt64(elements[1]), + elements[2]); + } + } + + public string Usage {get {return "year ratings_file movies_file [optimized]";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var movies = new BatchedDataSource(); + var ratings = new BatchedDataSource(); + bool optimized = true; + if (args.Length == 6) { + optimized = Convert.ToBoolean(args[5]); + } + long year = Convert.ToInt64(args[1]); + + var movies_in = computation.NewInput(movies) + .SelectMany(file => ReadMovies(file)); + var ratings_in = computation.NewInput(ratings) + .SelectMany(file => ReadRatings(file)); + + var ratingSel = movies_in.Where((Movie x) => x.year < year) + .Join(ratings_in, + (Movie movie) => movie.id, + (Rating rating) => rating.movId, + (Movie movie, Rating rating) => + new LongTriple(rating.movId, rating.userId, rating.rating)); + + var ratingJoin = ratingSel.Join(ratingSel, + (LongTriple r1) => r1.second, + (LongTriple r2) => r2.second, + (LongTriple r1, LongTriple r2) => + new LongTriple(r1.first, // movId + r1.third * r2.third, // rating * rating + r2.first)); // movId + + var intResult = ratingJoin.IntSum(row => new LongPair(row.first, row.third), + row => row.second); + + var matmultfinalPrj = intResult.Join(ratingSel, + intRes => intRes.First.s, // movId + ratSel => ratSel.first, // movId + (intRes, ratSel) => + new LongTriple(intRes.First.s, + intRes.Second * ratSel.third, + ratSel.second)); + + var predicted = matmultfinalPrj.IntSum(row => new LongPair(row.first, row.third), + row => row.second); + + if (optimized) { + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_out = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_out[i] = new StreamWriter(args[4] + "/prediction" + j + ".out"); + } + + var prediction = predicted.Max(row => row.First.t, + row => row.Second) + .Subscribe((i, l) => { foreach (var element in l) file_out[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + movies.OnCompleted(args[4] + "/" + args[3]); + ratings.OnCompleted(args[4] + "/" + args[2]); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_out[i].Close(); + } + } else { + var prediction = predicted.Max(row => row.First.t, + row => row.Second); + StreamWriter file_out = new StreamWriter(args[4] + "/prediction.out"); + if (computation.Configuration.ProcessID == 0) { + prediction.Subscribe(l => { foreach (var element in l) file_out.WriteLine(element); }); + computation.Activate(); + movies.OnCompleted(args[4] + "/" + args[3]); + ratings.OnCompleted(args[4] + "/" + args[2]); + } else { + computation.Activate(); + movies.OnCompleted(); + ratings.OnCompleted(); + } + computation.Join(); + file_out.Close(); + } + } + } + + public string Help { + get { + return "Netflix "; + } + } + + } + +} diff --git a/Examples/Naiad/NetflixOld.cs b/Examples/Naiad/NetflixOld.cs new file mode 100644 index 0000000..cd28f7d --- /dev/null +++ b/Examples/Naiad/NetflixOld.cs @@ -0,0 +1,306 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.Netflix +{ + + public class LongSumReducer : ReducerBase + { + public long value; + + public LongSumReducer(long value) + { + this.value = value; + } + + public override ReducerBase Add(ReducerBase other) + { + LongSumReducer otherCast = (LongSumReducer)other; + return new LongSumReducer(value + otherCast.value); + } + + } + + public class Netflix : Example { + + public struct LongPair : IEquatable + { + public long s; + public long t; + + public bool Equals(LongPair that) + { + return s == that.s && t == that.t; + } + + public long CompareTo(LongPair that) + { + if (this.s != that.s) + return this.s - that.s; + + return this.t - that.t; + } + + public override int GetHashCode() + { + return 47 * s.GetHashCode() + 36425232 * t.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1}", s, t); + } + + public LongPair(long ss, long tt) + { + s = ss; + t = tt; + } + + } + + public struct LongTriple : IEquatable + { + public long first; + public long second; + public long third; + + public bool Equals(LongTriple that) + { + return first == that.first && second == that.second && third == that.third; + } + + public long CompareTo(LongTriple that) + { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() + { + return first.GetHashCode() + 1234347 * second.GetHashCode() + + 4311 * third.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", first, second, third); + } + + public LongTriple(long x, long y, long z) + { + first = x; second = y; third = z; + } + } + + public struct Movie : IEquatable + { + public long id; + public long year; + public string title; + + public bool Equals(Movie that) + { + return id == that.id && year == that.year && title.Equals(that.title); + } + + public long CompareTo(Movie that) + { + if (id != that.id) + return id - that.id; + if (year != that.year) + return year - that.year; + return title.CompareTo(that.title); + } + + public override int GetHashCode() + { + return 31 * id.GetHashCode() + 1234347 * year.GetHashCode() + + 4311 * title.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", id, year, title); + } + + public Movie(long x, long y, string z) + { + id = x; year = y; title = z; + } + + } + + public struct Rating : IEquatable + { + public long movId; + public long userId; + public long rating; + + public bool Equals(Rating that) + { + return movId == that.movId && userId == that.userId && + rating == that.rating; + } + + public long CompareTo(Rating that) + { + if (movId != that.movId) + return movId - that.movId; + if (userId != that.userId) + return userId - that.userId; + return rating - that.rating; + } + + public override int GetHashCode() + { + return 31 * movId.GetHashCode() + 1234347 * userId.GetHashCode() + + 4311 * rating.GetHashCode(); + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", movId, userId, rating); + } + + public Rating(long x, long y, long z) + { + movId = x; + userId = y; + rating = z; + } + + } + + public IEnumerable SumRatings(LongPair movIds, + IEnumerable ratings) { + long allRatings = 0; + foreach (var rating in ratings) + allRatings += rating.second; + yield return new LongTriple(movIds.s, movIds.t, allRatings); + } + + public IEnumerable MaxRating(long id, IEnumerable ratings) { + long maxRating = 0; + foreach (var rating in ratings) + maxRating = Math.Max(maxRating, rating.third); + yield return new LongPair(id, maxRating); + } + + public IEnumerable ReadRatings(String filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new Rating(Convert.ToInt64(elements[0]), + Convert.ToInt64(elements[1]), + Convert.ToInt64(elements[2])); + } + } + + public IEnumerable ReadMovies(String filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new Movie(Convert.ToInt64(elements[0]), + Convert.ToInt64(elements[1]), + elements[2]); + } + } + + public string Usage {get {return "ratings_file movies_file year";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var movies = new BatchedDataSource(); + var ratings = new BatchedDataSource(); + if (args.Length != 4) { + Console.WriteLine("Please specify input files"); + } + long year = Convert.ToInt64(args[3]); + + var movies_in = computation.NewInput(movies) + .SelectMany(file => ReadMovies(file)); + var ratings_in = computation.NewInput(ratings) + .SelectMany(file => ReadRatings(file)); + + var ratingSel = movies_in.Where((Movie x) => x.year < year) + .Join(ratings_in, + (Movie movie) => movie.id, + (Rating rating) => rating.movId, + (Movie movie, Rating rating) => + new LongTriple(rating.movId, rating.userId, rating.rating)); + + var ratingJoin = ratingSel.Join(ratingSel, + (LongTriple r1) => r1.second, + (LongTriple r2) => r2.second, + (LongTriple r1, LongTriple r2) => + new LongTriple(r1.first, // movId + r1.third * r2.third, // rating * rating + r2.first)); // movId + + var intResult = ratingJoin.IntSum(row => new LongPair(row.first, row.third), + row => row.second); + + // var testRes = ratingJoin.GenericAggregator(row => new LongPair(row.first, row.third), + // row => new LongSumReducer(row.second)); + // var intResult = testRes.Select(row => new Pair(row.v1, ((LongSumReducer)row.v2).value)); + + var matmultfinalPrj = intResult.Join(ratingSel, + intRes => intRes.v1.s, // movId + ratSel => ratSel.first, // movId + (intRes, ratSel) => + new LongTriple(intRes.v1.s, + intRes.v2 * ratSel.third, + ratSel.second)); + + var predicted = matmultfinalPrj.IntSum(row => new LongPair(row.first, row.third), + row => row.second); + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_out = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_out[i] = new StreamWriter("/tmp/home/icg27/prediction/prediction" + j + ".out"); + } + + var prediction = predicted.Max(row => row.v1.t, + row => row.v2) + .Subscribe((i, l) => { foreach (var element in l) file_out[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + movies.OnCompleted(args[2]); + ratings.OnCompleted(args[1]); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_out[i].Close(); + } + } + } + + public string Help { + get { + return "Netflix"; + } + } + + } + +} \ No newline at end of file diff --git a/Examples/Naiad/PageRank.cs b/Examples/Naiad/PageRank.cs new file mode 100644 index 0000000..deebb91 --- /dev/null +++ b/Examples/Naiad/PageRank.cs @@ -0,0 +1,281 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Dataflow.Channels; +using Microsoft.Research.Naiad.DataStructures; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.PageRank +{ + + public class PageRank : Example { + + public struct IntInt : IEquatable { + public int first; + public int second; + + public bool Equals(IntInt that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntInt that) { + if (this.first != that.first) + return this.first - that.first; + + return this.second - that.second; + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second; + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntInt(int ss, int tt) { + first = ss; + second = tt; + } + + } + + public struct IntDouble : IEquatable { + public int first; + public double second; + + public bool Equals(IntDouble that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntDouble that) { + if (first != that.first) + return first - that.first; + if (second < that.second) { + return -1; + } else if (second > that.second) { + return 1; + } else { + return 0; + } + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntDouble(int x, double y) { + first = x; + second = y; + } + + } + + public IEnumerable read_edges(string filename) + { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) + { + var elements = reader.ReadLine().Split(' '); + yield return new IntInt(Convert.ToInt32(elements[0]),Convert.ToInt32(elements[1])); + } + } + + public IEnumerable read_pr(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntDouble(Convert.ToInt32(elements[0]),Convert.ToDouble(elements[1])); + } + } + + public struct Reducernode_cnt : IAddable + { + + public int value0; + + public Reducernode_cnt(int value0) + { + this.value0 = value0; + } + + public Reducernode_cnt Add(Reducernode_cnt other) + { + value0 += other.value0; + return this; + } + + } + + public struct Reducerpr1 : IAddable + { + + public double value0; + + public Reducerpr1(double value0) + { + this.value0 = value0; + } + + public Reducerpr1 Add(Reducerpr1 other) + { + value0 += other.value0; + return this; + } + + } + + + public struct IntIntInt : IEquatable + { + public int first; + public int second; + public int third; + + public bool Equals(IntIntInt that) + { + return first == that.first && second == that.second && third == that.third; + } + + public int CompareTo(IntIntInt that) + { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() + { + return first + 1234347 * second + 4311 * third; + } + + public override string ToString() + { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntInt(int x, int y, int z) + { + first = x; + second = y; + third = z; + } + } + + public struct IntIntIntDouble : IEquatable { + public int first; + public int second; + public int third; + public double fourth; + + public bool Equals(IntIntIntDouble that) { + return first == that.first && second == that.second && + third == that.third && Math.Abs(fourth - that.fourth) < 0.0000001; + } + + public int CompareTo(IntIntIntDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (this.third != that.third) + return this.third - that.third; + + return this.fourth - that.fourth < 0 ? -1 : 1; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third + fourth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3}", first, second, third, fourth); + } + + public IntIntIntDouble(int x, int y, int z, double zz) { + first = x; + second = y; + third = z; + fourth = zz; + } + } + + public string Usage {get {return " ";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var edges_input = new BatchedDataSource(); + var pr_input = new BatchedDataSource(); + var edges = computation.NewInput(edges_input).SelectMany(line => read_edges(line)); + var pr = computation.NewInput(pr_input).SelectMany(line => read_pr(line)); + var node_cnt_tmp = + edges.GenericAggregator(row => row.first, + row => new Reducernode_cnt(1)); + var node_cnt = node_cnt_tmp.Select(row => { Reducernode_cnt aggVal = (Reducernode_cnt)row.Second ; return new IntInt(row.First, aggVal.value0); }); + var edgescnt = edges.Join(node_cnt, + (IntInt left) => left.first, + (IntInt right) => right.first, + (IntInt left, IntInt right) => new IntIntInt(left.first, left.second , right.second)); + for (int i = 0; i < 5; ++i) { + var edgespr = edgescnt.Join(pr, + (IntIntInt left) => left.first, + (IntDouble right) => right.first, + (IntIntInt left, IntDouble right) => new IntIntIntDouble(left.first, left.second, left.third , right.second)); + var rankcnt = edgespr.Select(row => new IntIntIntDouble(row.first,row.second,row.third,row.fourth / row.third)); + var links = rankcnt.Select(row => new IntDouble(row.second,row.fourth)); + var pr1_tmp = links.GenericAggregator(row => row.first, + row => new Reducerpr1(row.second)); + var pr1 = pr1_tmp.Select(row => { Reducerpr1 aggVal = (Reducerpr1)row.Second ; return new IntDouble(row.First, aggVal.value0); }); + var pr2 = pr1.Select(row => new IntDouble(row.first,0.85 * row.second)); + pr = pr2.Select(row => new IntDouble(row.first,0.15 + row.second)); + } + + int minThreadId = computation.Configuration.ProcessID * computation.Configuration.WorkerCount; + + StreamWriter[] file_pr = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_pr[i] = new StreamWriter(args[2] + "/pagerank_" + args[1] + j + ".out"); + } + + pr.Subscribe((i, l) => { foreach (var element in l) file_pr[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + edges_input.OnCompleted(args[2] + "/pagerank_" + args[1] + "_edges" + computation.Configuration.ProcessID + ".in"); + pr_input.OnCompleted(args[2] + "/pagerank_" + args[1] + "_vertices" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_pr[i].Close(); + } + } + } + + public string Help { + get { + return "PageRank "; + } + } + + } +} diff --git a/Examples/Naiad/PageRankV2.cs b/Examples/Naiad/PageRankV2.cs new file mode 100644 index 0000000..97cb0e8 --- /dev/null +++ b/Examples/Naiad/PageRankV2.cs @@ -0,0 +1,140 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Dataflow.Channels; +using Microsoft.Research.Naiad.Dataflow.Iteration; +using Microsoft.Research.Naiad.DataStructures; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.PageRankV2 { + + public class PageRankV2 : Example + { + + public IEnumerable read_edges(string filename) + { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) + { + var elements = reader.ReadLine().Split(' '); + yield return new IntInt(Convert.ToInt32(elements[0]),Convert.ToInt32(elements[1])); + } + } + + public IEnumerable read_pr(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntDouble(Convert.ToInt32(elements[0]),Convert.ToDouble(elements[1])); + } + } + + public struct Reducernode_cnt : IAddable + { + + public int value0; + + public Reducernode_cnt(int value0) + { + this.value0 = value0; + } + + public Reducernode_cnt Add(Reducernode_cnt other) + { + value0 += other.value0; + return this; + } + + } + + public struct Reducerpr1 : IAddable + { + + public double value0; + + public Reducerpr1(double value0) + { + this.value0 = value0; + } + + public Reducerpr1 Add(Reducerpr1 other) + { + value0 += other.value0; + return this; + } + + } + + + public string Usage {get {return "";} } + + public static Stream> PageRankStep( + Stream> pr, + Stream edgescnt, + LoopContext lc) + { + return edgescnt.TransmitAlong1(pr) + .Select(row => new IntDouble(row.second, row.fourth / row.third)) + .GenericAggregator(row => row.first, + row => new Reducepr1(row.second)) + .Select(row => new IntDouble(row.First, row.Second.value0 * 0.85 + 0.15)); + } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var edges_input = new BatchedDataSource(); + var pr_input = new BatchedDataSource(); + var edges = computation.NewInput(edges_input).SelectMany(line => read_edges(line)); + var pr = graph.NewInput(pr_input).SelectMany(line => read_pr(line)); + var node_cnt_tmp = + edges.GenericAggregator(row => row.first, + row => new Reducernode_cnt(1)); + var node_cnt = node_cnt_tmp.Select(row => { Reducernode_cnt aggVal = (Reducernode_cnt)row.Second ; return new IntInt(row.First, aggVal.value0); }); + var edgescnt = edges.Join(node_cnt, + (IntInt left) => left.first, + (IntInt right) => right.first, + (IntInt left, IntInt right) => new IntIntInt(left.first, left.second , right.second)); + + pr = pr.IterateAndAccumulate((lc, deltas) => deltas.PageRankStep(edgescnt, lc), + x => x.first, + 5, + "PageRank"); + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + + StreamWriter[] file_pr = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_pr[i] = new StreamWriter("pr" + j + ".out"); + } + + pr.Subscribe((i, l) => { foreach (var element in l) file_pr[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + edges_input.OnCompleted("/tmp//edges/edges" + computation.Configuration.ProcessID + ".in"); + pr_input.OnCompleted("/tmp/pr/pr" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_pr[i].Close(); + } + } + } + + public string Help { + get { + return "PageRankV2"; + } + } + } + +} \ No newline at end of file diff --git a/Examples/Naiad/Project.cs b/Examples/Naiad/Project.cs new file mode 100644 index 0000000..6c0f252 --- /dev/null +++ b/Examples/Naiad/Project.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.Project { + + public class Project : Example + { + + public IEnumerable read_rel(string filename) + { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntPair(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1])); + } + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var input = new BatchedDataSource(); + var rel = computation.NewInput(input).SelectMany(filename => read_rel(filename)); + + var output = rel.Select(row => row.s); + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + + StreamWriter[] file_output = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_output[i] = new StreamWriter(args[2] + j + ".out"); + } + output.Subscribe((i, l) => { foreach (var element in l) file_output[i - minThreadId].WriteLine(element); }); + computation.Activate(); + input.OnCompleted(args[1]); + + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_output[i].Close(); + } + } + } + + public string Help { + get { + return "Project "; + } + } + } + +} diff --git a/Examples/Naiad/ReadTest.cs b/Examples/Naiad/ReadTest.cs new file mode 100644 index 0000000..72d20a1 --- /dev/null +++ b/Examples/Naiad/ReadTest.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.ReadTest +{ + + public class ReadTest : Example { + + public struct IntIntDouble : IEquatable { + public int first; + public int second; + public double third; + + public bool Equals(IntIntDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001; + } + + public int CompareTo(IntIntDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + return this.third - that.third < 0 ? -1 : 1; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntDouble(int x, int y, double z) { + first = x; + second = y; + third = z; + } + } + + public struct IntStringString : IEquatable { + public int first; + public string second; + public string third; + + public bool Equals(IntStringString that) { + return first == that.first && second.Equals(that.second) && + third.Equals(that.third); + } + + public int CompareTo(IntStringString that) { + if (this.first != that.first) + return this.first - that.first; + + if (!this.second.Equals(that.second)) + return this.second.CompareTo(that.second); + + return this.third.CompareTo(that.third); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second.GetHashCode() + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntStringString(int x, string y, string z) { + first = x; + second = y; + third = z; + } + } + + public IEnumerable read_part(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntStringString(Convert.ToInt32(elements[0]), + elements[3], elements[6]); + } + } + + public IEnumerable read_lineitem(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntIntDouble(Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[4]), + Convert.ToDouble(elements[5])); + } + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var part_input = new BatchedDataSource(); + var lineitem_input = new BatchedDataSource(); + bool multi_machine = true; + if (args.Length == 2) { + multi_machine = Convert.ToBoolean(args[1]); + } + var part = computation.NewInput(part_input).SelectMany(line => read_part(line)); + var lineitem = computation.NewInput(lineitem_input).SelectMany(line => read_lineitem(line)); + + if (multi_machine) { + part.Subscribe((i, l) => { }); + lineitem.Subscribe((i, l) => { }); + computation.Activate(); + part_input.OnCompleted("/tmp/part/part" + + computation.Configuration.ProcessID + ".in"); + lineitem_input.OnCompleted("/tmp/lineitem/lineitem" + + computation.Configuration.ProcessID + ".in"); + } else { + part.Subscribe(l => { }); + lineitem.Subscribe(l => { }); + computation.Activate(); + if (computation.Configuration.ProcessID == 0) { + part_input.OnCompleted("/tmp/part/part.in"); + lineitem_input.OnCompleted("/tmp/lineitem/lineitem.in"); + } else { + part_input.OnNext(); + lineitem_input.OnNext(); + part_input.OnCompleted(); + lineitem_input.OnCompleted(); + } + } + computation.Join(); + } + } + + public string Help { + get { + return "ReadTest [multi_machine]"; + } + } + } + +} \ No newline at end of file diff --git a/Examples/Naiad/SSSP.cs b/Examples/Naiad/SSSP.cs new file mode 100644 index 0000000..c1580be --- /dev/null +++ b/Examples/Naiad/SSSP.cs @@ -0,0 +1,185 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.SSSP { + + public class SSSP : Example { + + public struct IntIntInt : IEquatable { + public int first; + public int second; + public int third; + + public bool Equals(IntIntInt that) { + return first == that.first && second == that.second && third == that.third; + } + + public int CompareTo(IntIntInt that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + return this.third - that.third; + } + + // Embarassing hashcodes + public override int GetHashCode() + { + return first + 1234347 * second + 4311 * third; + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntInt(int x, int y, int z) { + first = x; + second = y; + third = z; + } + } + + public struct IntInt : IEquatable { + public int first; + public int second; + + public bool Equals(IntInt that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntInt that) { + if (this.first != that.first) + return this.first - that.first; + + return this.second - that.second; + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second; + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntInt(int ss, int tt) { + first = ss; + second = tt; + } + + } + + public IEnumerable ReadVertices(string filename) + { + if (File.Exists(filename)) { + var file = File.OpenText(filename); + while (!file.EndOfStream) { + var elements = file.ReadLine().Split(' '); + yield return new IntInt(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1])); + } + } else { + Console.WriteLine("File not found! {0}", filename); + } + } + + public IEnumerable ReadEdges(string filename) + { + if (File.Exists(filename)) { + var file = File.OpenText(filename); + while (!file.EndOfStream) { + var elements = file.ReadLine().Split(' '); + yield return new IntIntInt(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[2])); + } + } else { + Console.WriteLine("File not found! {0}", filename); + } + } + + public IEnumerable MinCosts(int id, IEnumerable costs) { + int minCost = int.MaxValue; + foreach (var cost in costs) { + minCost = Math.Min(minCost, cost.second); + } + yield return new IntInt(id, minCost); + } + + public Stream SSSPIter( + Stream edges_in, + Stream vertices_in) + where T: Time { + return edges_in.Join(vertices_in, + (IntIntInt edge) => edge.first, + (IntInt vertex) => vertex.first, + (IntIntInt edge, IntInt vertex) => + new IntInt(edge.second, vertex.second + edge.third)) + .Union(vertices_in) + .Min(row => row.first, + row => row.second) + .Select(row => new IntInt(row.First, row.Second)); +// .GroupBy(row => row.first, +// (id, costs) => MinCosts(id, costs)); + } + + public string Usage {get {return " ";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + + var edges = new BatchedDataSource(); + var vertices = new BatchedDataSource(); + + var edges_in = computation.NewInput(edges) + .SelectMany(file => ReadEdges(file)); + var vertices_in = computation.NewInput(vertices) + .SelectMany(file => ReadVertices(file)); + + for (int i = 0; i < 5; ++i) { + vertices_in = SSSPIter(edges_in, vertices_in); + } + + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_out = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_out[i] = new StreamWriter(args[2] + "/dij_vertices" + j + ".out"); + } + + vertices_in.Subscribe((i, l) => { foreach (var element in l) file_out[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + edges.OnCompleted(args[2] + "/sssp_" + args[1] + "_edges" + computation.Configuration.ProcessID + ".in"); + vertices.OnCompleted(args[2] + "/sssp_" + args[1] + "_vertices" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_out[i].Close(); + } + } + } + + public string Help { + get { + return "SSSP "; + } + } + + } + +} diff --git a/Examples/Naiad/Shopper.cs b/Examples/Naiad/Shopper.cs new file mode 100644 index 0000000..7f546bd --- /dev/null +++ b/Examples/Naiad/Shopper.cs @@ -0,0 +1,227 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using System.Diagnostics; +using System.IO; +using System.Text; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.Shopper { + + public class Shopper : Example { + + public struct IntInt : IEquatable { + public int first; + public int second; + + public bool Equals(IntInt that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntInt that) { + if (this.first != that.first) + return this.first - that.first; + + return this.second - that.second; + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second; + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntInt(int ss, int tt) { + first = ss; + second = tt; + } + + } + + public struct IntIntIntInt : IEquatable { + public int first; + public int second; + public int third; + public int fourth; + + public bool Equals(IntIntIntInt that) { + return first == that.first && second == that.second && third == that.third && + fourth == that.fourth; + } + + public int CompareTo(IntIntIntIntInt that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (this.third != that.third) + return this.third - that.third; + + return this.fourth - that.fourth; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return 31 * first + 1234347 * second + 4311 * third + 12315 * fourth; + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3}", first, second, third, fourth); + } + + public IntIntIntInt(int x, int y, int z, int w) { + first = x; second = y; third = z; fourth = w; + } + } + + public struct IntIntIntIntInt : IEquatable { + public int first; + public int second; + public int third; + public int fourth; + public int fifth; + + public bool Equals(IntIntIntIntInt that) { + return first == that.first && second == that.second && third == that.third && + fourth == that.fourth && fifth == that.fifth; + } + + public int CompareTo(IntIntIntIntInt that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (this.third != that.third) + return this.third - that.third; + + if (this.fourth != that.fourth) + return this.fourth - that.fourth; + + return this.fifth - that.fifth; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return 31 * first + 1234347 * second + 4311 * third + 12315 * fourth + 3 * fifth; + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3} {4}", first, second, third, fourth, fifth); + } + + public IntIntIntIntInt(int x, int y, int z, int w, int xx) { + first = x; second = y; third = z; fourth = w; fifth = xx; + } + } + + public struct ReducerShop : IAddable { + public int value; + + public ReducerShop(int value) { + this.value = value; + } + + public ReducerShop Add(ReducerShop other) { + this.value += other.value; + return this; + } + } + + public IEnumerable readShopLogs(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntIntIntInt(Convert.ToInt32(elements[0]), + Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[2]), + Convert.ToInt32(elements[3])); + } + } + + public IEnumerable generateShopLogs(string filename) { + int numUsers = 100000000; + int numCountries = 50; + int numProducts = 10000; + Random random = new Random(); + for (int curUser = 0; curUser < numUsers; curUser++) { + int numProductsBought = random.Next(1, 30); + for (int curProd = 0; curProd < numProductsBought; curProd++) + { + // int productId = random.Next(0, numProducts); + // int price = random.Next(1, 300); + // if (curUser % 2 == 0) { + // yield return new IntIntIntInt(curUser, 1, productId, price); + // } else { + // int countryId = random.Next(2, numCountries); + // yield return new IntIntIntInt(curUser, countryId, productId, price); + // } + yield return new IntIntIntInt(curUser, curUser % 2, 1, 100); + } + } + } + + public IEnumerable Sum(int id, IEnumerable logs) { + int aggVal = 0; + foreach (var log in logs) + aggVal += log.fourth; + yield return new IntInt(id, aggVal); + } + + public string Usage {get {return "shopping_file";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var shopLogs = new BatchedDataSource(); + var topSpenders = computation.NewInput(shopLogs) + .SelectMany(line => generateShopLogs(line)) + .Where(row => row.second == 1) + .GenericAggregator(row => row.first, + row => new ReducerShop(row.fourth)) + .Select(row => new IntInt(row.First, row.Second.value)) + // .GroupBy(row => row.first, + // (id, grouped) => Sum(id, grouped)) + .Where(row => row.second > 12000); + + // Open output files. + int minThreadId = computation.Configuration.ProcessID * + computation.Configuration.WorkerCount; + StreamWriter[] file_out = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_out[i] = new StreamWriter(args[2] + j + ".out"); + } + + topSpenders.Subscribe((i, l) => { + foreach (var element in l) + file_out[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + shopLogs.OnCompleted(args[1]); + computation.Join(); + // Close output files. + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_out[i].Close(); + } + } + } + public string Help { + get { + return "Shopper "; + } + } + } + +} \ No newline at end of file diff --git a/Examples/Naiad/TPCH.cs b/Examples/Naiad/TPCH.cs new file mode 100644 index 0000000..c8ce5ed --- /dev/null +++ b/Examples/Naiad/TPCH.cs @@ -0,0 +1,316 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.TPCH { + + public class TPCH : Example { + + public struct IntDouble : IEquatable { + public int first; + public double second; + + public bool Equals(IntDouble that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntDouble that) { + if (first != that.first) + return first - that.first; + if (second < that.second) { + return -1; + } else if (second > that.second) { + return 1; + } else { + return 0; + } + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntDouble(int x, double y) { + first = x; + second = y; + } + + } + + public struct IntStringString : IEquatable { + public int first; + public string second; + public string third; + + public bool Equals(IntStringString that) { + return first == that.first && second.Equals(that.second) && + third.Equals(that.third); + } + + public int CompareTo(IntStringString that) { + if (this.first != that.first) + return this.first - that.first; + + if (!this.second.Equals(that.second)) + return this.second.CompareTo(that.second); + + return this.third.CompareTo(that.third); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second.GetHashCode() + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntStringString(int x, string y, string z) { + first = x; + second = y; + third = z; + } + } + + public struct IntIntDouble : IEquatable { + public int first; + public int second; + public double third; + + public bool Equals(IntIntDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001; + } + + public int CompareTo(IntIntDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + return this.third - that.third < 0 ? -1 : 1; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntDouble(int x, int y, double z) { + first = x; + second = y; + third = z; + } + } + + public struct IntIntDoubleStringString : IEquatable { + public int first; + public int second; + public double third; + public string fourth; + public string fifth; + + public bool Equals(IntIntDoubleStringString that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001 && + fourth.Equals(that.fourth) && fifth.Equals(that.fifth); + } + + public int CompareTo(IntIntDoubleStringString that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (Math.Abs(this.third - that.third) > 0.0000001) + return this.third - that.third < 0 ? -1 : 1; + + if (!this.fourth.Equals(that.fourth)) + return this.fourth.CompareTo(that.fourth); + + return this.fifth.CompareTo(that.fifth); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode() + 42 * fourth.GetHashCode() + + 17 * fifth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3} {4}", first, second, third, fourth, fifth); + } + + public IntIntDoubleStringString(int x, int y, double z, string xx, string yy) { + first = x; + second = y; + third = z; + fourth = xx; + fifth = yy; + } + } + + public struct IntIntDoubleStringStringDouble : IEquatable { + public int first; + public int second; + public double third; + public string fourth; + public string fifth; + public double sixth; + + public bool Equals(IntIntDoubleStringStringDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001 && + fourth.Equals(that.fourth) && fifth.Equals(that.fifth) && + Math.Abs(sixth - that.sixth) < 0.0000001; + } + + public int CompareTo(IntIntDoubleStringStringDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (Math.Abs(this.third - that.third) > 0.0000001) + return this.third - that.third < 0 ? -1 : 1; + + if (!this.fourth.Equals(that.fourth)) + return this.fourth.CompareTo(that.fourth); + + if (!this.fifth.Equals(that.fifth)) + return this.fifth.CompareTo(that.fifth); + + return this.sixth.CompareTo(that.sixth); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode() + 42 * fourth.GetHashCode() + + 17 * fifth.GetHashCode() + 13 * sixth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3} {4} {5}", first, second, + third, fourth, fifth, sixth); + } + + public IntIntDoubleStringStringDouble(int x, int y, double z, string xx, + string yy, double xxx) { + first = x; + second = y; + third = z; + fourth = xx; + fifth = yy; + sixth = xxx; + } + } + + public IEnumerable read_part(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntStringString(Convert.ToInt32(elements[0]), + elements[3], elements[6]); + } + } + + public IEnumerable read_lineitem(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntIntDouble(Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[4]), + Convert.ToDouble(elements[5])); + } + } + + public IEnumerable agg_cnt_lineitem(int key, IEnumerable values) { + int cntValue = 0; + int aggValue = 0; + foreach (var value in values) { + cntValue++; + aggValue += value.second; + } + yield return new IntDouble(key, aggValue / cntValue * 0.2); + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = Naiad.NewComputation.FromArgs(ref args)) { + var part_input = new BatchedDataSource(); + var lineitem_input = new BatchedDataSource(); + var part = computation.NewInput(part_input).SelectMany(line => read_part(line)); + var lineitem = computation.NewInput(lineitem_input).SelectMany(line => read_lineitem(line)); + + var agg_lin = lineitem.IntSum(row => row.first, + row => row.second); + var cnt_lin = lineitem.IntSum(row => row.first, + row => 1); + var avg_lineitem = agg_lin.Join(cnt_lin, + agg_row => agg_row.First, + cnt_row => cnt_row.First, + (agg_row, cnt_row) => new IntDouble(agg_row.First, agg_row.Second / cnt_row.Second * 0.2)); + +// var avg_lineitem = lineitem.GroupBy(row => row.first, +// (key, selected) => agg_cnt_lineitem(key, selected)); + + var linepart = lineitem.Join(part, + (IntIntDouble left) => left.first, + (IntStringString right) => right.first, + (IntIntDouble left, IntStringString right) => new IntIntDoubleStringString(left.first, left.second, left.third , right.second, right.third)); + + var lineavg = linepart.Join(avg_lineitem, + (IntIntDoubleStringString left) => left.first, + (IntDouble right) => right.first, + (IntIntDoubleStringString left, IntDouble right) => new IntIntDoubleStringStringDouble(left.first, left.second, left.third, left.fourth, left.fifth , right.second)); + var avg_yearly = lineavg.Where(row => ((row.fourth == "Brand#13") && ((row.fifth == "MEDBAG") && (row.second > row.sixth)))).Select(row => row.third); + + int minThreadId = computation.Configuration.ProcessID * computation.Configuration.WorkerCount; + StreamWriter[] file_avg_yearly = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_avg_yearly[i] = new StreamWriter(args[1] + "/avg_yearly" + j + ".out"); + } + avg_yearly.Subscribe((i, l) => { foreach (var element in l) file_avg_yearly[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + part_input.OnCompleted(args[1] + "/part" + computation.Configuration.ProcessID + ".in"); + lineitem_input.OnCompleted(args[1] + "/lineitem" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_avg_yearly[i].Close(); + } + } + } + + public string Help { + get { + return "TPCH "; + } + } + + } + +} diff --git a/Examples/Naiad/TPCHLindi.cs b/Examples/Naiad/TPCHLindi.cs new file mode 100644 index 0000000..b9ab91c --- /dev/null +++ b/Examples/Naiad/TPCHLindi.cs @@ -0,0 +1,309 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; + +using Microsoft.Research.Naiad; +using Microsoft.Research.Naiad.Dataflow; +using Microsoft.Research.Naiad.Frameworks.Lindi; +using Microsoft.Research.Naiad.Frameworks.Reduction; +using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow; +using Microsoft.Research.Naiad.Input; + +namespace Microsoft.Research.Naiad.Examples.TPCH { + + public class TPCH : Example { + + public struct IntIntDoubleStringString : IEquatable { + public int first; + public int second; + public double third; + public string fourth; + public string fifth; + + public bool Equals(IntIntDoubleStringString that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001 && + fourth.Equals(that.fourth) && fifth.Equals(that.fifth); + } + + public int CompareTo(IntIntDoubleStringString that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (Math.Abs(this.third - that.third) > 0.0000001) + return this.third - that.third < 0 ? -1 : 1; + + if (!this.fourth.Equals(that.fourth)) + return this.fourth.CompareTo(that.fourth); + + return this.fifth.CompareTo(that.fifth); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode() + 42 * fourth.GetHashCode() + + 17 * fifth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3} {4}", first, second, third, fourth, fifth); + } + + public IntIntDoubleStringString(int x, int y, double z, string xx, string yy) { + first = x; + second = y; + third = z; + fourth = xx; + fifth = yy; + } + } + + public struct IntIntDoubleStringStringDouble : IEquatable { + public int first; + public int second; + public double third; + public string fourth; + public string fifth; + public double sixth; + + public bool Equals(IntIntDoubleStringStringDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001 && + fourth.Equals(that.fourth) && fifth.Equals(that.fifth) && + Math.Abs(sixth - that.sixth) < 0.0000001; + } + + public int CompareTo(IntIntDoubleStringStringDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + + if (Math.Abs(this.third - that.third) > 0.0000001) + return this.third - that.third < 0 ? -1 : 1; + + if (!this.fourth.Equals(that.fourth)) + return this.fourth.CompareTo(that.fourth); + + if (!this.fifth.Equals(that.fifth)) + return this.fifth.CompareTo(that.fifth); + + return this.sixth.CompareTo(that.sixth); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode() + 42 * fourth.GetHashCode() + + 17 * fifth.GetHashCode() + 13 * sixth.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2} {3} {4} {5}", first, second, + third, fourth, fifth, sixth); + } + + public IntIntDoubleStringStringDouble(int x, int y, double z, string xx, + string yy, double xxx) { + first = x; + second = y; + third = z; + fourth = xx; + fifth = yy; + sixth = xxx; + } + } + + public struct IntDouble : IEquatable { + + public int first; + public double second; + + public bool Equals(IntDouble that) { + return first == that.first && second == that.second; + } + + public int CompareTo(IntDouble that) { + if (first != that.first) + return first - that.first; + if (second < that.second) { + return -1; + } else if (second > that.second) { + return 1; + } else { + return 0; + } + } + + public override int GetHashCode() { + return 47 * first + 36425232 * second.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1}", first, second); + } + + public IntDouble(int x, double y) { + first = x; + second = y; + } + + } + + public struct IntStringString : IEquatable { + + public int first; + public string second; + public string third; + + public bool Equals(IntStringString that) { + return first == that.first && second.Equals(that.second) && + third.Equals(that.third); + } + + public int CompareTo(IntStringString that) { + if (this.first != that.first) + return this.first - that.first; + + if (!this.second.Equals(that.second)) + return this.second.CompareTo(that.second); + + return this.third.CompareTo(that.third); + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second.GetHashCode() + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntStringString(int x, string y, string z) { + first = x; + second = y; + third = z; + } + } + + public struct IntIntDouble : IEquatable { + public int first; + public int second; + public double third; + + public bool Equals(IntIntDouble that) { + return first == that.first && second == that.second && + Math.Abs(third - that.third) < 0.0000001; + } + + public int CompareTo(IntIntDouble that) { + if (this.first != that.first) + return this.first - that.first; + + if (this.second != that.second) + return this.second - that.second; + return this.third - that.third < 0 ? -1 : 1; + } + + // Embarassing hashcodes + public override int GetHashCode() { + return first + 1234347 * second + 4311 * third.GetHashCode(); + } + + public override string ToString() { + return String.Format("{0} {1} {2}", first, second, third); + } + + public IntIntDouble(int x, int y, double z) { + first = x; + second = y; + third = z; + } + } + + public IEnumerable read_part(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntStringString(Convert.ToInt32(elements[0]), + elements[3], elements[6]); + } + } + + public IEnumerable read_lineitem(string filename) { + StreamReader reader = File.OpenText(filename); + for (; !reader.EndOfStream; ) { + var elements = reader.ReadLine().Split(' '); + yield return new IntIntDouble(Convert.ToInt32(elements[1]), + Convert.ToInt32(elements[4]), + Convert.ToDouble(elements[5])); + } + } + + public IEnumerable agg_cnt_lineitem(int key, IEnumerable values) { + int cntValue = 0; + int aggValue = 0; + foreach (var value in values) { + cntValue++; + aggValue += value.second; + } + yield return new IntDouble(key, aggValue / cntValue * 0.2); + } + + public string Usage {get {return "";} } + + public void Execute(string[] args) { + using (var computation = NewComputation.FromArgs(ref args)) { + var part_input = new BatchedDataSource(); + var lineitem_input = new BatchedDataSource(); + var part = computation.NewInput(part_input).SelectMany(line => read_part(line)); + var lineitem = computation.NewInput(lineitem_input).SelectMany(line => read_lineitem(line)); + + var avg_lineitem = lineitem.GroupBy(row => row.first, + (key, selected) => agg_cnt_lineitem(key, selected)); + + var linepart = lineitem.Join(part, + (IntIntDouble left) => left.first, + (IntStringString right) => right.first, + (IntIntDouble left, IntStringString right) => new IntIntDoubleStringString(left.first, left.second, left.third , right.second, right.third)); + + var lineavg = linepart.Join(avg_lineitem, + (IntIntDoubleStringString left) => left.first, + (IntDouble right) => right.first, + (IntIntDoubleStringString left, IntDouble right) => new IntIntDoubleStringStringDouble(left.first, left.second, left.third, left.fourth, left.fifth , right.second)); + var avg_yearly = lineavg.Where(row => ((row.fourth == "Brand#13") && ((row.fifth == "MEDBAG") && (row.second > row.sixth)))).Select(row => row.third); + + int minThreadId = computation.Configuration.ProcessID * computation.Configuration.WorkerCount; + StreamWriter[] file_avg_yearly = new StreamWriter[computation.Configuration.WorkerCount]; + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + int j = minThreadId + i; + file_avg_yearly[i] = new StreamWriter("/tmp/home/icg27/avg_yearly/avg_yearly" + j + ".out"); + } + avg_yearly.Subscribe((i, l) => { foreach (var element in l) file_avg_yearly[i - minThreadId].WriteLine(element); }); + + computation.Activate(); + part_input.OnCompleted("/tmp/home/icg27/part/part" + computation.Configuration.ProcessID + ".in"); + lineitem_input.OnCompleted("/tmp/home/icg27/lineitem/lineitem" + computation.Configuration.ProcessID + ".in"); + computation.Join(); + for (int i = 0; i < computation.Configuration.WorkerCount; ++i) { + file_avg_yearly[i].Close(); + } + } + } + + public string Help { + get { + return "TPCH"; + } + } + + } + +} \ No newline at end of file diff --git a/Examples/Program.cs b/Examples/Program.cs index 8373925..e183fcb 100644 --- a/Examples/Program.cs +++ b/Examples/Program.cs @@ -1,13 +1,13 @@ /* * Naiad ver. 0.5 * Copyright (c) Microsoft Corporation - * All rights reserved. + * All rights reserved. * - * Licensed under the Apache License, Version 2.0 (the "License"); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT @@ -63,6 +63,17 @@ static void Main(string[] args) examples.Add("lookup", new KeyValueLookup.KeyValueLookup()); examples.Add("connectedcomponents", new ConnectedComponents.ConnectedComponents()); + examples.Add("join", new Join.Join()); + examples.Add("joinlj", new JoinLJ.JoinLJ()); + examples.Add("kmeans", new KMeans.KMeans()); + examples.Add("netflix", new Netflix.Netflix()); + examples.Add("pagerank", new PageRank.PageRank()); + examples.Add("project", new Project.Project()); + examples.Add("readtest", new ReadTest.ReadTest()); + examples.Add("tpch", new TPCH.TPCH()); + examples.Add("shopper", new Shopper.Shopper()); + examples.Add("sssp", new SSSP.SSSP()); + // two examples capable of stressing Naiad's performance examples.Add("benchmark-throughput", new Throughput.Throughput()); examples.Add("benchmark-latency", new Latency.Latency()); @@ -73,7 +84,7 @@ static void Main(string[] args) examples.Add("dd-wordcount", new DifferentialDataflow.WordCount()); examples.Add("dd-searchindex", new DifferentialDataflow.SearchIndex()); examples.Add("dd-graphcoloring", new DifferentialDataflow.GraphColoring()); - + examples.Add("dd-netflix", new DifferentialDataflow.Netflix()); // some GraphLINQ examples examples.Add("graphlinq-reachability", new Examples.GraphLINQ.Reachability()); examples.Add("graphlinq-pagerank", new Examples.GraphLINQ.PageRank()); diff --git a/Frameworks/Lindi/Lindi.cs b/Frameworks/Lindi/Lindi.cs index 27d82e3..3edf75b 100644 --- a/Frameworks/Lindi/Lindi.cs +++ b/Frameworks/Lindi/Lindi.cs @@ -24,6 +24,7 @@ using Microsoft.Research.Naiad.Dataflow; using Microsoft.Research.Naiad.Dataflow.PartitionBy; using Microsoft.Research.Naiad.Dataflow.StandardVertices; +using Microsoft.Research.Naiad.Frameworks.Reduction; using Microsoft.Research.Naiad.Utilities; using Microsoft.Research.Naiad.Dataflow.Iteration; using System.Linq.Expressions; @@ -727,6 +728,19 @@ public static Stream, TTime> Max .Aggregate((a, b) => a.CompareTo(b) > 0 ? a : b, true); } + + + + public static Stream, TTime> IntSum(this Stream stream, Func keySelector, Func valueSelector) where TTime : Time { + return stream.Reduce(keySelector, valueSelector, () => new IntSumReducer(), "IntSum"); + } + + public static Stream, TTime> GenericAggregator(this Stream stream, Func keySelector, Func valueSelector) + where TTime : Time + where TValue : IAddable { + return stream.Reduce, TValue, TValue, TValue, TKey, TRecord, TTime>(keySelector, valueSelector, () => new GenericReducer(), "GenericReducer"); + } + /// /// Groups records using the supplied key selector, and applies the given aggregation function. /// diff --git a/Naiad.sln b/Naiad.sln index c5e9963..e31d7d9 100644 --- a/Naiad.sln +++ b/Naiad.sln @@ -32,12 +32,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkGenerator", "Frameworks\WorkGenerator\WorkGenerator.csproj", "{EBA3D350-41EB-474C-AED9-9CFD1F809DE3}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebHdfsSupport", "Frameworks\WebHdfsSupport\WebHdfsSupport.csproj", "{5CFC93F6-68C3-45B5-92FA-43F8626EC482}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Storage", "frameworks\Storage\Storage.csproj", "{0DCA9543-FF9D-48D6-9748-A966DC39C35D}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HdfsSupport", "frameworks\HdfsSupport\HdfsSupport.csproj", "{66D2A00E-F889-4B2F-9C40-04A32278FB86}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -378,20 +372,6 @@ Global {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Debug|x64.ActiveCfg = Debug|x64 {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Debug|x64.Build.0 = Debug|x64 {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Debug|x86.ActiveCfg = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|Any CPU.ActiveCfg = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|Any CPU.Build.0 = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|Mixed Platforms.ActiveCfg = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|Mixed Platforms.Build.0 = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|x64.ActiveCfg = Debug|x64 - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|x64.Build.0 = Debug|x64 - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoDebug|x86.ActiveCfg = Debug|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|Any CPU.ActiveCfg = Release|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|Any CPU.Build.0 = Release|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|Mixed Platforms.ActiveCfg = Release|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|Mixed Platforms.Build.0 = Release|Any CPU - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|x64.ActiveCfg = Release|x64 - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|x64.Build.0 = Release|x64 - {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.MonoRelease|x86.ActiveCfg = Release|Any CPU {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Release|Any CPU.ActiveCfg = Release|Any CPU {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Release|Any CPU.Build.0 = Release|Any CPU {5CFC93F6-68C3-45B5-92FA-43F8626EC482}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU @@ -413,20 +393,6 @@ Global {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Debug|x64.ActiveCfg = Debug|x64 {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Debug|x64.Build.0 = Debug|x64 {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Debug|x86.ActiveCfg = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|Any CPU.ActiveCfg = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|Any CPU.Build.0 = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|Mixed Platforms.ActiveCfg = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|Mixed Platforms.Build.0 = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|x64.ActiveCfg = Debug|x64 - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|x64.Build.0 = Debug|x64 - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoDebug|x86.ActiveCfg = Debug|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|Any CPU.ActiveCfg = Release|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|Any CPU.Build.0 = Release|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|Mixed Platforms.ActiveCfg = Release|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|Mixed Platforms.Build.0 = Release|Any CPU - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|x64.ActiveCfg = Release|x64 - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|x64.Build.0 = Release|x64 - {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.MonoRelease|x86.ActiveCfg = Release|Any CPU {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Release|Any CPU.ActiveCfg = Release|Any CPU {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Release|Any CPU.Build.0 = Release|Any CPU {0DCA9543-FF9D-48D6-9748-A966DC39C35D}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU @@ -447,18 +413,6 @@ Global {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Debug|x64.ActiveCfg = Debug|x64 {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Debug|x64.Build.0 = Debug|x64 {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Debug|x86.ActiveCfg = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|Any CPU.ActiveCfg = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|Mixed Platforms.ActiveCfg = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|Mixed Platforms.Build.0 = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|x64.ActiveCfg = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|x64.Build.0 = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoDebug|x86.ActiveCfg = Debug|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|Any CPU.ActiveCfg = Release|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|Mixed Platforms.ActiveCfg = Release|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|Mixed Platforms.Build.0 = Release|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|x64.ActiveCfg = Release|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|x64.Build.0 = Release|x64 - {66D2A00E-F889-4B2F-9C40-04A32278FB86}.MonoRelease|x86.ActiveCfg = Release|x64 {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Release|Any CPU.ActiveCfg = Release|x64 {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Release|Mixed Platforms.ActiveCfg = Release|x64 {66D2A00E-F889-4B2F-9C40-04A32278FB86}.Release|Mixed Platforms.Build.0 = Release|x64 diff --git a/Naiad/Frameworks/Reduce.cs b/Naiad/Frameworks/Reduce.cs index b3243f3..6c7a62e 100644 --- a/Naiad/Frameworks/Reduce.cs +++ b/Naiad/Frameworks/Reduce.cs @@ -35,25 +35,40 @@ namespace Microsoft.Research.Naiad.Frameworks.Reduction { - internal static class ExtensionMethods + + + public abstract class ReducerBase { + + public abstract ReducerBase Add(ReducerBase other); + + public static ReducerBase operator +(ReducerBase l, ReducerBase r) { + return l.Add(r); + } + + } + + public interface IAddable { + T Add(T other); + } + + public static class ExtensionMethods { - - internal static Stream LocalReduce(this Stream stream, Func factory, string name) + public static Stream LocalReduce(this Stream stream, Func factory, string name) where TReducer : IReducer where TTime : Time { return Foundry.NewUnaryStage(stream, (i, v) => new LocalReduceVertex(i, v, factory), null, null, name); } - internal static Stream LocalCombine(this Stream stream, Func factory, string name) + public static Stream LocalCombine(this Stream stream, Func factory, string name) where TReducer : IReducer where TTime : Time { return Foundry.NewUnaryStage(stream, (i, v) => new LocalCombineVertex(i, v, factory), null, null, name); } - internal static Stream, TTime> LocalReduce( + public static Stream, TTime> LocalReduce( this Stream stream, Func key, Func val, Func factory, string name, Expression> inPlacement, Expression, int>> outPlacement) where TReducer : IReducer @@ -62,7 +77,7 @@ internal static Stream, TTime> LocalReduce new LocalKeyedReduceVertex(i, v, key, val, factory), inPlacement, outPlacement, name); } - internal static Stream, T> LocalTimeReduce( + public static Stream, T> LocalTimeReduce( this Stream stream, Func key, Func val, Func factory, string name, Expression> inPlacement, Expression, int>> outPlacement) where A : IReducer @@ -71,7 +86,7 @@ internal static Stream, T> LocalTimeReduce( return Foundry.NewUnaryStage(stream, (i, v) => new LocalTimeKeyedReduceVertex(i, v, key, val, factory), inPlacement, outPlacement, name); } - internal static Stream, T> LocalReduce( + public static Stream, T> LocalReduce( this Stream stream, Func key, Func val, Func factory, string name) where A : IReducer where T : Time @@ -79,7 +94,7 @@ internal static Stream, T> LocalReduce( return LocalReduce(stream, key, val, factory, name, null, null); } - internal static Stream, T> LocalCombine( + public static Stream, T> LocalCombine( this Stream, T> stream, Func factory, string name, Expression, int>> outPlacement) where A : IReducer @@ -95,7 +110,7 @@ internal static Stream, T> LocalCombine( return Foundry.NewUnaryStage(stream, (i, v) => new LocalKeyedCombineVertex(i, v, factory), inPlacement, outPlacement, name); } - internal static Stream, T> LocalTimeCombine( + public static Stream, T> LocalTimeCombine( this Stream, T> stream, Func factory, string name, Expression, int>> outPlacement) where A : IReducer @@ -110,7 +125,7 @@ internal static Stream, T> LocalTimeCombine( return Foundry.NewUnaryStage(stream, (i, v) => new LocalTimeKeyedCombineVertex(i, v, factory), inPlacement, outPlacement, name); } - internal static Stream, T> LocalCombine( + public static Stream, T> LocalCombine( this Stream, T> stream, Func factory, string name) where A : IReducer where T : Time @@ -118,7 +133,7 @@ internal static Stream, T> LocalCombine( return stream.LocalCombine(factory, name, null); } - internal static Stream, T> Reduce( + public static Stream, T> Reduce( this Stream stream, Func key, Func val, Func factory, string name) where A : IReducer where T : Time @@ -128,7 +143,7 @@ internal static Stream, T> Reduce( LocalCombine(factory, name + "Combine", x => x.First.GetHashCode()); } - internal static Stream Broadcast(this Stream stream) + public static Stream Broadcast(this Stream stream) where R : Cloneable where T : Time { @@ -159,7 +174,7 @@ internal static Stream Broadcast(this Stream stream) return collectable.UnaryExpression(null, xs => xs.Select(x => x.Second), "Select"); } - internal static Stream BroadcastReduce(this Stream stream, Func factory, string name) + public static Stream BroadcastReduce(this Stream stream, Func factory, string name) where A : IReducer where X : Cloneable where T : Time @@ -167,7 +182,7 @@ internal static Stream BroadcastReduce(this Stream st return stream.LocalReduce(factory, name + "Reduce").Broadcast().LocalCombine(factory, name + "Combine"); } - internal static Stream BroadcastReduce(this Stream stream, Func factory, string name) + public static Stream BroadcastReduce(this Stream stream, Func factory, string name) where A : IReducer where X : Cloneable where T : Time @@ -179,7 +194,7 @@ internal static Stream BroadcastReduce(this Stream stream, namespace Microsoft.Research.Naiad.Frameworks.Reduction { - internal interface IReducer + public interface IReducer { // Accumulate an object of type R void Add(TInput r); @@ -202,7 +217,7 @@ internal interface IReducer // Placeholder type for generics that support reduction, used when // reduction is not needed - internal struct DummyReducer : IReducer + public struct DummyReducer : IReducer { public void Add(X x) { @@ -235,7 +250,7 @@ public X Value() } } - internal class Aggregation : IReducer + public class Aggregation : IReducer { public void InitialAdd(R other) { @@ -289,7 +304,7 @@ public Aggregation(Func a, Func ia, Func c, Func f X value; } - internal class Aggregation : Aggregation + public class Aggregation : Aggregation { public Aggregation(Func c) : base(c, x => x, c, x => x) @@ -297,7 +312,7 @@ public Aggregation(Func c) } } - internal struct CountReducer : IReducer + public struct CountReducer : IReducer { public void InitialAdd(S s) { value = 1; } public void Add(S t) { ++value; } @@ -308,7 +323,7 @@ internal struct CountReducer : IReducer private Int64 value; } - internal struct IntSumReducer : IReducer + public struct IntSumReducer : IReducer { public void InitialAdd(Int64 r) { value = r; } public void Add(Int64 r) { value += r; } @@ -319,7 +334,7 @@ internal struct IntSumReducer : IReducer private Int64 value; } - internal struct FloatSumReducer : IReducer + public struct FloatSumReducer : IReducer { public void InitialAdd(float r) { value = r; } public void Add(float r) { value += r; } @@ -330,7 +345,7 @@ internal struct FloatSumReducer : IReducer private float value; } - internal struct DoubleSumReducer : IReducer + public struct DoubleSumReducer : IReducer { public void InitialAdd(double r) { value = r; } public void Add(double r) { value += r; } @@ -341,7 +356,36 @@ internal struct DoubleSumReducer : IReducer private double value; } - internal struct MinReducer : IReducer where T : IComparable + public struct GenericReducer : IReducer where T : IAddable { + + public void InitialAdd(T r) { + value = r; + } + + public void Add(T r) { + value.Add(r); + } + + public void InitialCombine(T other) { + value = other; + } + + public void Combine(T other) { + value.Add(other); + } + + public T State() { + return value; + } + + public T Value() { + return value; + } + + private T value; + } + + public struct MinReducer : IReducer where T : IComparable { public void InitialAdd(T r) { value = r; } public void Add(T r) { value = r.CompareTo(value) < 0 ? r : value; } @@ -352,7 +396,7 @@ internal struct MinReducer : IReducer where T : IComparable private T value; } - internal struct MaxReducer : IReducer where T : IComparable + public struct MaxReducer : IReducer where T : IComparable { public void InitialAdd(T r) { value = r; } public void Add(T r) { value = r.CompareTo(value) > 0 ? r : value; } @@ -363,7 +407,7 @@ internal struct MaxReducer : IReducer where T : IComparable private T value; } - internal class LocalReduceVertex : UnaryVertex + public class LocalReduceVertex : UnaryVertex where A : IReducer where T : Time { @@ -413,7 +457,7 @@ public LocalReduceVertex(int index, Stage stage, Func factory) } } - internal class LocalCombineVertex : UnaryVertex + public class LocalCombineVertex : UnaryVertex where A : IReducer where T : Time { @@ -463,7 +507,7 @@ public LocalCombineVertex(int index, Stage stage, Func factory) } } - internal class LocalKeyedReduceVertex : UnaryVertex, T> + public class LocalKeyedReduceVertex : UnaryVertex, T> where A : IReducer where T : Time { @@ -558,7 +602,7 @@ public LocalKeyedReduceVertex( } } - internal class LocalTimeKeyedReduceVertex : UnaryVertex, T> + public class LocalTimeKeyedReduceVertex : UnaryVertex, T> where A : IReducer where T : Time { @@ -644,7 +688,7 @@ public LocalTimeKeyedReduceVertex( } - internal class LocalKeyedCombineVertex : UnaryVertex, Pair, T> + public class LocalKeyedCombineVertex : UnaryVertex, Pair, T> where A : IReducer where T : Time { @@ -739,7 +783,7 @@ public LocalKeyedCombineVertex( } } - internal class LocalTimeKeyedCombineVertex : UnaryVertex, Pair, T> + public class LocalTimeKeyedCombineVertex : UnaryVertex, Pair, T> where A : IReducer where T : Time { @@ -819,14 +863,14 @@ public LocalTimeKeyedCombineVertex( } } - internal interface Cloneable + public interface Cloneable { // Return a copy suitable for broadcasting. Can be a shallow copy if the object // is known to be immutable T MakeCopy(); } - internal class BroadcastSendVertex : UnaryVertex, T> + public class BroadcastSendVertex : UnaryVertex, T> where R : Cloneable where T : Time { @@ -863,7 +907,7 @@ public BroadcastSendVertex(int index, Stage stage, int[] dest) } } - internal class BroadcastForwardVertex : UnaryVertex, Pair, T> + public class BroadcastForwardVertex : UnaryVertex, Pair, T> where R : Cloneable where T : Time { diff --git a/build_mono.sh b/build_mono.sh old mode 100644 new mode 100755 index c3a3e62..3d7d151 --- a/build_mono.sh +++ b/build_mono.sh @@ -1,7 +1,14 @@ #!/bin/sh +# Grab certificates necessary for NuGet +mozroots --import --sync + # To build in Debug mode. Binaries will be placed in $PROJ/bin/Debug. xbuild /p:Configuration="MonoDebug" +echo "---------------------------------------------" +echo "Mono Debug build done. For a release build, change build_mono.sh" +echo "and uncomment the \"MonoRelease\" configuration line." +echo "---------------------------------------------" # To build in Release mode. Binaries will be placed in $PROJ/bin/Release. #xbuild /p:Configuration="MonoRelease"