diff --git a/src/Schedgen2/config_example.json b/src/Schedgen2/config_example.json new file mode 100644 index 0000000..b4a5339 --- /dev/null +++ b/src/Schedgen2/config_example.json @@ -0,0 +1,8 @@ +{ + "ptrn": "allreduce", + "algotithm": "ring", + "comm_size": 16, + "datasize": 1024, + "output": "allreduce_ring_16_1024.bin", + "txt2bin": "../../build/txt2bin" +} \ No newline at end of file diff --git a/src/Schedgen2/mpi_colls.py b/src/Schedgen2/mpi_colls.py new file mode 100644 index 0000000..e545bbe --- /dev/null +++ b/src/Schedgen2/mpi_colls.py @@ -0,0 +1,259 @@ +from math import log2, ceil +import random + +from goal import GoalComm +from patterns import iterative_send_recv, parallel_send_recv, windowed_send_recv + + +def binomialtree(comm_size, datasize, tag, dir="reduce"): + comm = GoalComm(comm_size) + for rank in range(0, comm_size): + send = None + recv = None + for r in range(0, ceil(log2(comm_size))): + peer = rank + pow(2, r) + if (rank + pow(2, r) < comm_size) and (rank < pow(2, r)): + if dir == "reduce": + recv = comm.Recv(size=datasize, src=peer, dst=rank, tag=tag) + elif dir == "bcast": + send = comm.Send(size=datasize, dst=peer, src=rank, tag=tag) + else: + raise ValueError( + "direction " + str(dir) + " in binomialtree not implemented." + ) + if (send is not None) and (recv is not None): + send.requires(recv) + peer = rank - pow(2, r) + if (rank >= pow(2, r)) and (rank < pow(2, r + 1)): + if dir == "reduce": + send = comm.Send(size=datasize, dst=peer, src=rank, tag=tag) + if dir == "bcast": + recv = comm.Recv(size=datasize, src=peer, dst=rank, tag=tag) + + return comm + + +def dissemination(comm_size, datasize, tag): + comm = GoalComm(comm_size) + for rank in range(0, comm_size): + dist = 1 + recv = None + while dist < comm_size: + send = comm.Send( + src=rank, + dst=(rank + dist + comm_size) % comm_size, + size=datasize, + tag=tag, + ) + if recv is not None: + send.requires(recv) + recv = comm.Recv( + src=(rank - dist + comm_size) % comm_size, + dst=rank, + size=datasize, + tag=tag, + ) + dist *= 2 + return comm + + +def recdoub_allreduce(comm, comm_size, datasize, tag, ctd=0): + num_steps = int(log2(comm_size)) + for rank in range(0, comm_size): + # Reduce-scatter + sources = [rank ^ (2**i) for i in range(num_steps)] + destinations = sources + data_sizes_receive = [datasize // (2**i) for i in range(1, num_steps + 1)] + data_sizes_send = data_sizes_receive + tags = [tag + i for i in range(num_steps)] + dependency = iterative_send_recv( + comm, + rank, + sources, + destinations, + data_sizes_receive, + data_sizes_send, + tags, + compute_time_dependency=ctd, + ) + + # Allgather + sources = sources[::-1] + destinations = sources + data_sizes_receive = data_sizes_receive[::-1] + data_sizes_send = data_sizes_send[::-1] + tags = [tag + num_steps + i for i in range(num_steps)] + iterative_send_recv( + comm, + rank, + sources, + destinations, + data_sizes_receive, + data_sizes_send, + tags, + last_dependency=dependency, + compute_time_dependency=ctd, + ) + + +def ring_allreduce(comm, comm_size, datasize, tag, ctd=0): + for rank in range(0, comm_size): + chunk_size = ( + datasize // comm_size + if datasize % comm_size == 0 + else datasize // comm_size + 1 + ) + sources = [(rank - 1) % comm_size] * (comm_size - 1) + destinations = [(rank + 1) % comm_size] * (comm_size - 1) + data_sizes_receive = [chunk_size] * (comm_size - 1) + data_sizes_send = [chunk_size] * (comm_size - 1) + tags = [tag + i for i in range(comm_size - 1)] + dependency = iterative_send_recv( + comm, + rank, + sources, + destinations, + data_sizes_receive, + data_sizes_send, + tags, + compute_time_dependency=ctd, + ) + tags = [tag + comm_size - 1 + i for i in range(comm_size - 1)] + iterative_send_recv( + comm, + rank, + destinations, + sources, + data_sizes_send, + data_sizes_receive, + tags, + last_dependency=dependency, + compute_time_dependency=ctd, + ) + + +def allreduce(algorithm, comm_size, datasize, tag, ctd=0, **kwargs): + comm = GoalComm(comm_size) + if algorithm == "ring": + ring_allreduce(comm, comm_size, datasize, tag, ctd) + elif algorithm == "recdoub": + recdoub_allreduce(comm, comm_size, datasize, tag, ctd) + elif algorithm == "datasize_based": + if datasize < 4096: + recdoub_allreduce(comm, comm_size, datasize, tag, ctd) + else: + ring_allreduce(comm, comm_size, datasize, tag, ctd) + else: + raise ValueError(f"allreduce algorithm {algorithm} not implemented") + return comm + + +def multi_allreduce(algorithm, num_comm_groups, comm_size, **kwargs): + comm = GoalComm(comm_size * num_comm_groups) + comms = comm.CommSplit( + color=[i // comm_size for i in range(comm_size * num_comm_groups)], + key=[i % comm_size for i in range(comm_size * num_comm_groups)], + ) + for comm_split in comms: + allreduce(algorithm, comm_split.CommSize(), **kwargs) + return comm + + +def windowed_alltoall(comm, comm_size, window_size, datasize, tag, **kwargs): + for rank in range(0, comm_size): + sources = [(rank - step) % comm_size for step in range(1, comm_size)] + destination = [(rank + step) % comm_size for step in range(1, comm_size)] + data_sizes_receive = [datasize] * (comm_size - 1) + data_sizes_send = [datasize] * (comm_size - 1) + + windowed_send_recv( + comm, + rank, + sources, + destination, + data_sizes_receive, + data_sizes_send, + window_size, + tag, + ) + + +def balanced_alltoall(comm, comm_size, datasize, tag, **kwargs): + for rank in range(0, comm_size): + sources = [(rank - step) % comm_size for step in range(1, comm_size)] + destination = [(rank + step) % comm_size for step in range(1, comm_size)] + data_sizes_receive = [datasize] * (comm_size - 1) + data_sizes_send = [datasize] * (comm_size - 1) + + parallel_send_recv( + comm, rank, sources, destination, data_sizes_receive, data_sizes_send, tag + ) + + +def unbalanced_alltoall(comm, comm_size, datasize, tag, **kwargs): + datasizes_randomized = [ + [ + datasize + int(0.1 * random.randint(-datasize, datasize)) + for _ in range(comm_size) + ] + for _ in range(comm_size) + ] + for rank in range(0, comm_size): + sources = [(rank - step) % comm_size for step in range(1, comm_size)] + destination = [(rank + step) % comm_size for step in range(1, comm_size)] + data_sizes_receive = [datasizes_randomized[src][rank] for src in sources] + data_sizes_send = [datasizes_randomized[rank][dst] for dst in destination] + + parallel_send_recv( + comm, rank, sources, destination, data_sizes_receive, data_sizes_send, tag + ) + + +def alltoall(algorithm, comm_size, **kwargs): + comm = GoalComm(comm_size) + if algorithm == "windowed": + windowed_alltoall(comm, comm_size, **kwargs) + elif algorithm == "balanced": + balanced_alltoall(comm, comm_size, **kwargs) + elif algorithm == "unbalanced": + unbalanced_alltoall(comm, comm_size, **kwargs) + else: + raise ValueError(f"alltoall algorithm {algorithm} not implemented") + return comm + + +def multi_alltoall(algorithm, num_comm_groups, comm_size, **kwargs): + comm = GoalComm(comm_size * num_comm_groups) + comms = comm.CommSplit( + color=[i // comm_size for i in range(comm_size * num_comm_groups)], + key=[i % comm_size for i in range(comm_size * num_comm_groups)], + ) + for comm_split in comms: + alltoall(algorithm, comm_split.CommSize(), **kwargs) + return comm + + +def incast(comm_size, unbalanced, datasize, tag, **kwargs): + comm = GoalComm(comm_size) + for src in range(1, comm_size): + size = ( + datasize + int(0.1 * random.randint(-datasize, datasize)) + if unbalanced + else datasize + ) + comm.Send(src=src, dst=0, size=size, tag=tag) + comm.Recv(src=src, dst=0, size=size, tag=tag) + return comm + + +def outcast(comm_size, unbalanced, datasize, tag, **kwargs): + comm = GoalComm(comm_size) + for dst in range(1, comm_size): + size = ( + datasize + int(0.1 * random.randint(-datasize, datasize)) + if unbalanced + else datasize + ) + comm.Send(src=0, dst=dst, size=size, tag=tag) + comm.Recv(src=0, dst=dst, size=size, tag=tag) + return comm diff --git a/src/Schedgen2/patterns.py b/src/Schedgen2/patterns.py index 2581197..e4dc2ff 100644 --- a/src/Schedgen2/patterns.py +++ b/src/Schedgen2/patterns.py @@ -1,80 +1,154 @@ -from math import ceil, log2 -from goal import GoalComm +from typing import List, Union +from goal import GoalComm, GoalOp -def binomialtree(comm_size, datasize, tag, dir="reduce"): - comm = GoalComm(comm_size) - for rank in range(0, comm_size): - send = None - recv = None - for r in range(0, ceil(log2(comm_size))): - peer = rank + pow(2, r) - if (rank + pow(2, r) < comm_size) and (rank < pow(2, r)): - if dir == "reduce": - recv = comm.Recv(size=datasize, src=peer, dst=rank, tag=tag) - elif dir == "bcast": - send = comm.Send(size=datasize, dst=peer, src=rank, tag=tag) - else: - raise ValueError( - "direction " + str(dir) + " in binomialtree not implemented." - ) - if (send is not None) and (recv is not None): - send.requires(recv) - peer = rank - pow(2, r) - if (rank >= pow(2, r)) and (rank < pow(2, r + 1)): - if dir == "reduce": - send = comm.Send(size=datasize, dst=peer, src=rank, tag=tag) - if dir == "bcast": - recv = comm.Recv(size=datasize, src=peer, dst=rank, tag=tag) +def _prepare_send_recv_data( + sources: Union[int, List[int]], + destinations: Union[int, List[int]], + data_sizes_receive: Union[int, List[int]], + data_sizes_send: Union[int, List[int]], + tags: Union[int, List[int]], +): + if isinstance(sources, int) and isinstance(destinations, int): + sources = [sources] + destinations = [destinations] + elif isinstance(sources, int): + sources = [sources] * len(destinations) + elif isinstance(destinations, int): + destinations = [destinations] * len(sources) + assert len(sources) == len( + destinations + ), "sources and destinations must be the same length" + if isinstance(data_sizes_receive, int): + data_sizes_receive = [data_sizes_receive] * len(sources) + if isinstance(data_sizes_send, int): + data_sizes_send = [data_sizes_send] * len(destinations) + assert len(data_sizes_receive) == len( + sources + ), "data_sizes_receive and sources must be the same length" + assert len(data_sizes_send) == len( + destinations + ), "data_sizes_send and destinations must be the same length" + if isinstance(tags, int): + tags = [tags] * len(sources) + assert len(tags) == len(sources), "tags and sources must be the same length" + return sources, destinations, data_sizes_receive, data_sizes_send, tags - return comm +def iterative_send_recv( + goal_comm: GoalComm, + rank: int, + sources: Union[int, List[int]], + destinations: Union[int, List[int]], + data_sizes_receive: Union[int, List[int]], + data_sizes_send: Union[int, List[int]], + tags: Union[int, List[int]], + last_dependency: GoalOp = None, + compute_time_dependency=0, +) -> GoalOp: + """ + Receive data from sources at rank and send data from rank to destinations with dependencies. -def dissemination(comm_size, datasize, tag): - comm = GoalComm(comm_size) - for rank in range(0, comm_size): - dist = 1 - recv = None - while dist < comm_size: - send = comm.Send( - src=rank, - dst=(rank + dist + comm_size) % comm_size, - size=datasize, - tag=tag, - ) - if recv is not None: - send.requires(recv) - recv = comm.Recv( - src=(rank - dist + comm_size) % comm_size, - dst=rank, - size=datasize, - tag=tag, + :param goal_comm: GoalComm object that contains the ranks + :param rank: rank to receive data at from sources and send data to destinations + :param sources: rank(s) to receive data from + :param destinations: rank(s) to send data to + :param data_sizes_receive: size(s) of data to receive from sources + :param data_sizes_send: size(s) of data to send to destinations + :param tags: tags to use for send and receive operations + :param last_dependency: last operation in a previous chain of operations to depend on + :param compute_time_dependency: time to compute before sending data. + If 0 (default), no compute time is added and the send operation is dependent on the receive operation. + :return: GoalOp object that represents the last operation in the chain + """ + dependency = last_dependency + + for source, destination, data_size_receive, data_size_send, tag in zip( + *_prepare_send_recv_data( + sources, destinations, data_sizes_receive, data_sizes_send, tags + ) + ): + send = goal_comm.Send(src=rank, dst=destination, size=data_size_send, tag=tag) + if dependency is not None: + send.requires(dependency) + dependency = goal_comm.Recv( + src=source, dst=rank, size=data_size_receive, tag=tag + ) + if compute_time_dependency > 0: + dependency = goal_comm.Calc(host=rank, size=compute_time_dependency) + return dependency + + +def windowed_send_recv( + goal_comm: GoalComm, + rank: int, + sources: Union[int, List[int]], + destinations: Union[int, List[int]], + data_sizes_receive: Union[int, List[int]], + data_sizes_send: Union[int, List[int]], + window_size: int, + tags: Union[int, List[int]], + last_dependencies: List[GoalOp] = None, +): + """ + Receive data from sources at rank and send data from rank to destinations without dependencies. + + :param goal_comm: GoalComm object that contains the ranks + :param rank: rank to receive data at from sources and send data to destinations + :param sources: rank(s) to receive data from + :param destinations: rank(s) to send data to + :param data_sizes_receive: size(s) of data to receive from sources + :param data_sizes_send: size(s) of data to send to destinations + :param window_size: number of operations that can be in flight at once + :param tags: tags to use for send and receive operations + :param last_dependencies: last operations in a previous chain of operations to depend on + """ + assert ( + not last_dependencies or len(last_dependencies) == window_size + ), "last_dependencies must be the same length as window_size" + + window = last_dependencies or [None] * window_size + + for i, (source, destination, data_size_receive, data_size_send, tag) in enumerate( + zip( + *_prepare_send_recv_data( + sources, destinations, data_sizes_receive, data_sizes_send, tags ) - dist *= 2 - return comm + ) + ): + send = goal_comm.Send(src=rank, dst=destination, size=data_size_send, tag=tag) + if window[i % window_size] is not None: + send.requires(window[i % window_size]) + window[i % window_size] = send + goal_comm.Recv(src=source, dst=rank, size=data_size_receive, tag=tag) + + return window + +def parallel_send_recv( + goal_comm: GoalComm, + rank: int, + sources: Union[int, List[int]], + destinations: Union[int, List[int]], + data_sizes_receive: Union[int, List[int]], + data_sizes_send: Union[int, List[int]], + tags: Union[int, List[int]], +): + """ + Receive data from sources at rank and send data from rank to destinations without dependencies. -def ring_allreduce(comm_size, datasize, base_tag): - comm = GoalComm(comm_size) - for rank in range(0, comm_size): - recv = None - send = None - chunk_size = ( - datasize // comm_size - if datasize % comm_size == 0 - else datasize // comm_size + 1 + :param goal_comm: GoalComm object that contains the ranks + :param rank: rank to receive data at from sources and send data to destinations + :param sources: rank(s) to receive data from + :param destinations: rank(s) to send data to + :param data_sizes_receive: size(s) of data to receive from sources + :param data_sizes_send: size(s) of data to send to destinations + :param tags: tags to use for send and receive operations + """ + for source, destination, data_size_receive, data_size_send, tag in zip( + *_prepare_send_recv_data( + sources, destinations, data_sizes_receive, data_sizes_send, tags ) - for _ in range(2): - # Phase 0: reduce-scatter, Phase 1: allgather - for _ in range(0, comm_size - 1): - send = comm.Send( - src=rank, dst=(rank + 1) % comm_size, size=chunk_size, tag=base_tag - ) - if recv is not None: - send.requires(recv) - recv = comm.Recv( - src=(rank - 1) % comm_size, dst=rank, size=chunk_size, tag=base_tag - ) - # update tag for next phase - base_tag += 1 - return comm + ): + goal_comm.Send(src=rank, dst=destination, size=data_size_send, tag=tag) + goal_comm.Recv(src=source, dst=rank, size=data_size_receive, tag=tag) diff --git a/src/Schedgen2/schedgen.py b/src/Schedgen2/schedgen.py index 0864d35..5882899 100644 --- a/src/Schedgen2/schedgen.py +++ b/src/Schedgen2/schedgen.py @@ -1,64 +1,189 @@ import sys +import json +import tempfile +import subprocess import argparse -from patterns import binomialtree, dissemination, ring_allreduce +from mpi_colls import * parser = argparse.ArgumentParser(description="Generate GOAL Schedules.") -parser.add_argument( - "--ptrn", - dest="ptrn", - choices=[ - "binomialtreereduce", - "binarytreebcast", - "dissemination", - "ring_allreduce", - ], - help="Pattern to generate", - required=True, -) -parser.add_argument( - "--commsize", dest="commsize", type=int, default=8, help="Size of the communicator" -) -parser.add_argument( - "--datasize", - dest="datasize", - type=int, - default=8, - help="Size of the data, i.e., for reduce operations", -) -parser.add_argument("--output", dest="output", default="stdout", help="Output file") -parser.add_argument( - "--ignore_verification", - dest="ignore_verification", - action="store_true", - help="Ignore verification of parameters", + +subparsers = parser.add_subparsers( + help="Pattern to generate", dest="ptrn", required=True ) +simple_patterns = [] +multi_patterns = [] + +incast_parser = subparsers.add_parser("incast") +simple_patterns.append(incast_parser) + +outcast_parser = subparsers.add_parser("outcast") +simple_patterns.append(outcast_parser) + +for p in [incast_parser, outcast_parser]: + p.add_argument( + "--unbalanced", + dest="unbalanced", + action="store_true", + help="Use unbalanced incast/outcast", + ) +binomialtreereduce_parser = subparsers.add_parser("binomialtreereduce") +simple_patterns.append(binomialtreereduce_parser) + +binomialtreebcast_parser = subparsers.add_parser("binomialtreebcast") +simple_patterns.append(binomialtreebcast_parser) + +dissemination_parser = subparsers.add_parser("dissemination") +simple_patterns.append(dissemination_parser) + +allreduce_parser = subparsers.add_parser("allreduce") +simple_patterns.append(allreduce_parser) + +multi_allreduce_parser = subparsers.add_parser("multi_allreduce") +multi_patterns.append(multi_allreduce_parser) + +alltoall_parser = subparsers.add_parser("alltoall") +simple_patterns.append(alltoall_parser) + +multi_alltoall_parser = subparsers.add_parser("multi_alltoall") +multi_patterns.append(multi_alltoall_parser) + +for p in [ + allreduce_parser, + multi_allreduce_parser, +]: + p.add_argument( + "--algorithm", + dest="algorithm", + choices=[ + "ring", + "recdoub", + "datasize_based", + ], + default="datasize_based", + help="Algorithm to use for allreduce", + ) + +for p in [ + alltoall_parser, + multi_alltoall_parser, +]: + p.add_argument( + "--algorithm", + dest="algorithm", + choices=[ + "windowed", + "balanced", + "unbalanced", + ], + default="datasize_based", + help="Algorithm to use for alltoall", + ) + p.add_argument( + "--window_size", + dest="window_size", + type=int, + default=8, + help="Window size for windowed alltoall", + ) + +for p in simple_patterns + multi_patterns: + p.add_argument( + "--comm_size", + dest="comm_size", + type=int, + default=8, + help="Size of the communicator", + ) + p.add_argument( + "--datasize", + dest="datasize", + type=int, + default=8, + help="Size of the data, i.e., for reduce operations", + ) + p.add_argument("--output", dest="output", default="stdout", help="Output file") + p.add_argument( + "--ignore_verification", + dest="ignore_verification", + action="store_true", + help="Ignore verification of parameters", + ) + p.add_argument( + "--config", + dest="config", + help="Configuration file, takes precedence over other parameters", + ) + p.add_argument( + "--txt2bin", + dest="txt2bin", + help="Path to txt2bin executable", + ) + +for p in multi_patterns: + p.add_argument( + "--num_comm_groups", + dest="num_comm_groups", + type=int, + required=True, + help="Number of communication groups", + ) def verify_params(args): if args.ignore_verification: return - assert args.commsize > 0, "Communicator size must be greater than 0." + assert args.comm_size > 0, "Communicator size must be greater than 0." assert args.datasize > 0, "Data size must be greater than 0." + assert ( + args.txt2bin is None or args.output != "stdout" + ), "Cannot use txt2bin with stdout" args = parser.parse_args() +if args.config is not None: + with open(args.config, "r") as f: + config = json.load(f) + for k, v in config.items(): + setattr(args, k, v) -verify_params(args) -if args.output == "stdout": - args.output = sys.stdout -else: - args.output = open(args.output, "w") +verify_params(args) if args.ptrn == "binomialtreereduce": - g = binomialtree(args.commsize, args.datasize, 42, "reduce") + g = binomialtree(args.comm_size, args.datasize, 42, "reduce") elif args.ptrn == "binomialtreebcast": - g = binomialtree(args.commsize, args.datasize, 42, "bcast") + g = binomialtree(args.comm_size, args.datasize, 42, "bcast") elif args.ptrn == "dissemination": - g = dissemination(args.commsize, args.datasize, 42) -elif args.ptrn == "ring_allreduce": - g = ring_allreduce(args.commsize, args.datasize, 42) + g = dissemination(args.comm_size, args.datasize, 42) +elif args.ptrn == "allreduce": + g = allreduce(tag=42, **vars(args)) +elif args.ptrn == "multi_allreduce": + g = multi_allreduce(tag=42, **vars(args)) +elif args.ptrn == "alltoall": + g = alltoall(tag=42, **vars(args)) +elif args.ptrn == "multi_alltoall": + g = multi_alltoall(tag=42, **vars(args)) +elif args.ptrn == "incast": + g = incast(tag=42, **vars(args)) +elif args.ptrn == "outcast": + g = outcast(tag=42, **vars(args)) + +if args.txt2bin is not None: + assert args.output != "stdout", "Cannot use txt2bin with stdout" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + g.write_goal(fh=f) + tmp_goal_file = f.name + subprocess.run( + [args.txt2bin, "-i", tmp_goal_file, "-o", args.output, "-p"], + check=True, + ) + subprocess.run(["rm", tmp_goal_file], check=True) +else: + if args.output == "stdout": + args.output = sys.stdout + else: + args.output = open(args.output, "w") -g.write_goal(fh=args.output) -if args.output != sys.stdout: - args.output.close() + g.write_goal(fh=args.output) + if args.output != sys.stdout: + args.output.close()