diff --git a/MasterWorker.hs b/MasterWorker.hs index 7a4b2310be781148713c69ee62f1fe60a8390062..c39850b6c631b962cfb1e6b36cf720602610fa93 100644 --- a/MasterWorker.hs +++ b/MasterWorker.hs @@ -3,7 +3,10 @@ -- orbit-int master (controlling orbit computation) -- module MasterWorker( -- Master - orbit + GenClos + , HostInfo (..) + , MaybeHosts(..) + , orbit , get_gens, get_master, get_workers, get_spawn_img_comp , get_global_table_size, get_idle_timeout , set_idle_timeout, clear_spawn_img_comp @@ -73,151 +76,6 @@ type ParConf = type WorkerStats = [(String, String)] --- DATA --- Static Machine Configuration: --- {Gs, %list of generators --- Master, %pid of master process --- Workers, %list of Worker --- GlobalTableSize, %size of global hash table --- IdleTimeout, %milliseconds this worker idles before sending 'done' --- SpawnImgComp} %true iff this worker spawns image computations --- --- Worker: --- {Pid, %pid of worker process --- TableOffset, %offset (= index 0) of local table into global table --- TableSize} %size of local hash table --- --- Host: --- {Node, %atom naming Erlang node --- Procs, %number of processors --- TableSize, %size of hash table per processor --- IdleTimeout} %milliseconds a processor idles before sending 'done' --- --- Statistics: --- List of pairs where the first component is an atom, the second --- some data. Part of the data is the fill frequency of the table --- (a list whose ith element indicates frequency of filling degree i). - - --- MESSAGES --- Master -> Worker: {init, StaticMachConf} --- --- Master/Worker -> Worker: {vertex, X, Slot, K} --- %X is vertex --- %Slot is slot of X on target worker --- %K is atomic credit shipped with vertex --- --- Worker -> Master: {done, Cs} --- %Cs is non-zero credit (rep as list of ints) --- --- Master -> Worker: {dump} --- --- Worker -> Master: {result, Xs, Stats} --- %Xs is list of found orbit vertices --- %Stats is statistics about worker's table - - --- compute orbit of elements in list Xs under list of generators Gs; --- the argument Hosts is either an integer N, a triple {P, N, T}, or --- a non-empty list [{H, P, N, T} | ...] of quadruples: --- * N: run the sequential algorithm with table size N --- * {P, N, T, S}: run the parallel algorithm on P processors --- each with table size N, idle timeout T and --- spawn image computation flag S; --- * [{H, P, N, T, S} | ...]: run the distributed algorithm on the list of --- hosts, where each quintuple {H, P, N, T, S} --- specifies --- * host name H (ie. name of Erlang node), --- * number of processors P on H, --- * table size N (per processor), --- * idle timeout T, and --- * spawn image computation flag S. --- The function returns a pair consisting of the computed orbit and --- a list of statistics, the first element of which reports overall statistics, --- and all remaining elements report statistics of some worker. -orbit :: GenClos -> [Vertex] -> MaybeHosts -> Process ([Vertex], [MasterStats]) -orbit (GenClos (_, _, gs)) xs (Seq tablesize) = - return $ Sq.orbit gs xs tablesize -orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo - -par_orbit :: GenClos -> [Vertex] -> HostInfo - -> Process ([Vertex], [MasterStats]) -par_orbit gs xs hosts = do - -- spawn workers on Hosts - (workers, globTabSize) <- start_workers hosts - self <- getSelfPid - let -- assemble StaticMachConf and distribute to Workers - staticMachConf = mk_static_mach_conf gs self workers globTabSize - mapM_ (\(pid, _, _) -> send pid ("init", staticMachConf)) workers - let -- start wall clock timer - startTime = now - -- distribute initial vertices to workers - credit <- distribute_vertices staticMachConf one xs - -- collect credit handed back by idle workers - collect_credit credit - let -- measure elapsed time (in milliseconds) - elapsedTime = now - startTime - -- tell all workers to dump their tables - mapM_ (\(pid, _, _) -> send pid "dump") workers - -- collect results from all workers and return them - collect_orbit elapsedTime (length workers) - --- start_workers starts worker processes depending on the input Hosts: --- * if Hosts is a quadruple {P, _, _, _} then P processes are forked on the --- executing Erlang node; --- * if Hosts is a non-empty list {H1, P1, _, _, _}, {H2, P2, _, _, _}, ... --- then P1 processes are forked on Erlang node H1, P2 processes on node H2, --- and so on. --- The function returns a pair {Workers, GlobalTableSize}, where --- * GlobalTableSize is the total number of slots of the global hash table, and --- * Workers is a list of Worker, sorted wrt. TableOffset in ascending order. -start_workers :: HostInfo -> Process ([(ProcessId, Int, Int)], Int) -start_workers (JustOne host) = do - (workers, globalTableSize) <- do_start_shm host ([], 0) - return (reverse workers, globalTableSize) -start_workers (Many hosts) = do - (workers, globalTableSize) <- do_start_dist hosts ([], 0) - return (reverse workers, globalTableSize) - -do_start_shm (0, _, _, _) acc = return acc -do_start_shm (m, tabSize, tmOut, spawnImgComp) (workers, gTabSize) = do - selfNode <- getSelfNode - pid <- spawnLink selfNode ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp)) - do_start_shm (m - 1, tabSize, tmOut, spawnImgComp) - ((pid, gTabSize, tabSize) : workers, gTabSize + tabSize) - -do_start_dist [] acc = return acc -do_start_dist ((_, 0, _, _, _) : hosts) acc = do_start_dist hosts acc -do_start_dist ((node,m,tabSize,tmOut,spawnImgComp) : hosts) (workers,gTabSize) = do - pid <- spawnLink node ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp)) - do_start_dist ((node, m - 1, tabSize, tmOut, spawnImgComp) : hosts) - ((pid, gTabSize, tabSize) : workers, gTabSize + tabSize) - --- collect_credit collects leftover credit from idle workers until --- the credit adds up to 1. -collect_credit :: Credit -> Process () -collect_credit crdt - | is_one crdt = return () - | otherwise = receiveWait [ - match $ \("done", workersCredit) -> - collect_credit $ credit workersCredit crdt - ] - --- collect_orbit collects partial orbits and stats from N workers. -collect_orbit :: Int -> Int -> Process ([Vertex], [MasterStats]) -collect_orbit elapsedTime n = do - (orbit, stats) <- do_collect_orbit n [] [] - return (concat orbit, master_stats elapsedTime stats : stats) - -do_collect_orbit :: Int -> [[Vertex]] -> [WorkerStats] - -> Process ([[Vertex]], [WorkerStats]) -do_collect_orbit 0 partOrbits workerStats = return (partOrbits, workerStats) -do_collect_orbit n partOrbits workerStats = do - receiveWait [ - match $ \("result", partOrbit, workerStat) -> - do_collect_orbit (n - 1) (partOrbit : partOrbits) (workerStat : workerStats) - ] - ------------------------------------------------------------------------------- -- auxiliary functions @@ -292,8 +150,8 @@ defaultCt = Ct { verts_recvd = 0 } -- initialise worker -init :: Int -> Int -> Bool -> Process () -init localTableSize idleTimeout spawnImgComp = +init :: (Int, Int, Bool) -> Process () +init (localTableSize, idleTimeout, spawnImgComp) = receiveWait [ match $ \("init", staticMachConf0) -> do let staticMachConf1 = set_idle_timeout staticMachConf0 idleTimeout @@ -303,8 +161,6 @@ init localTableSize idleTimeout spawnImgComp = vertex_server staticMachConf zero (new localTableSize) defaultCt ] -remotable ['init] - -- main worker loop: server handling vertex messages; -- StaticMachConf: info about machine configuration -- Credit: credit currently held by the server, @@ -493,3 +349,159 @@ tail_idle_from_stat stat = max_idle_from_stat :: WorkerStats -> Int max_idle_from_stat stat = read (fromJust ("max_idle_time" `lookup` stat)) :: Int + +remotable ['init] + +-- +-- orbit-int master (controlling orbit computation) +-- + +-- DATA +-- Static Machine Configuration: +-- {Gs, %list of generators +-- Master, %pid of master process +-- Workers, %list of Worker +-- GlobalTableSize, %size of global hash table +-- IdleTimeout, %milliseconds this worker idles before sending 'done' +-- SpawnImgComp} %true iff this worker spawns image computations +-- +-- Worker: +-- {Pid, %pid of worker process +-- TableOffset, %offset (= index 0) of local table into global table +-- TableSize} %size of local hash table +-- +-- Host: +-- {Node, %atom naming Erlang node +-- Procs, %number of processors +-- TableSize, %size of hash table per processor +-- IdleTimeout} %milliseconds a processor idles before sending 'done' +-- +-- Statistics: +-- List of pairs where the first component is an atom, the second +-- some data. Part of the data is the fill frequency of the table +-- (a list whose ith element indicates frequency of filling degree i). + + +-- MESSAGES +-- Master -> Worker: {init, StaticMachConf} +-- +-- Master/Worker -> Worker: {vertex, X, Slot, K} +-- %X is vertex +-- %Slot is slot of X on target worker +-- %K is atomic credit shipped with vertex +-- +-- Worker -> Master: {done, Cs} +-- %Cs is non-zero credit (rep as list of ints) +-- +-- Master -> Worker: {dump} +-- +-- Worker -> Master: {result, Xs, Stats} +-- %Xs is list of found orbit vertices +-- %Stats is statistics about worker's table + + +-- compute orbit of elements in list Xs under list of generators Gs; +-- the argument Hosts is either an integer N, a triple {P, N, T}, or +-- a non-empty list [{H, P, N, T} | ...] of quadruples: +-- * N: run the sequential algorithm with table size N +-- * {P, N, T, S}: run the parallel algorithm on P processors +-- each with table size N, idle timeout T and +-- spawn image computation flag S; +-- * [{H, P, N, T, S} | ...]: run the distributed algorithm on the list of +-- hosts, where each quintuple {H, P, N, T, S} +-- specifies +-- * host name H (ie. name of Erlang node), +-- * number of processors P on H, +-- * table size N (per processor), +-- * idle timeout T, and +-- * spawn image computation flag S. +-- The function returns a pair consisting of the computed orbit and +-- a list of statistics, the first element of which reports overall statistics, +-- and all remaining elements report statistics of some worker. + +orbit :: GenClos -> [Vertex] -> MaybeHosts -> Process ([Vertex], [MasterStats]) +orbit (GenClos (_, _, gs)) xs (Seq tablesize) = + return $ Sq.orbit gs xs tablesize +orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo + +par_orbit :: GenClos -> [Vertex] -> HostInfo + -> Process ([Vertex], [MasterStats]) +par_orbit gs xs hosts = do + -- spawn workers on Hosts + (workers, globTabSize) <- start_workers hosts + self <- getSelfPid + let -- assemble StaticMachConf and distribute to Workers + staticMachConf = mk_static_mach_conf gs self workers globTabSize + mapM_ (\(pid, _, _) -> send pid ("init", staticMachConf)) workers + let -- start wall clock timer + startTime = now + -- distribute initial vertices to workers + credit <- distribute_vertices staticMachConf one xs + -- collect credit handed back by idle workers + collect_credit credit + let -- measure elapsed time (in milliseconds) + elapsedTime = now - startTime + -- tell all workers to dump their tables + mapM_ (\(pid, _, _) -> send pid "dump") workers + -- collect results from all workers and return them + collect_orbit elapsedTime (length workers) + +-- start_workers starts worker processes depending on the input Hosts: +-- * if Hosts is a quadruple {P, _, _, _} then P processes are forked on the +-- executing Erlang node; +-- * if Hosts is a non-empty list {H1, P1, _, _, _}, {H2, P2, _, _, _}, ... +-- then P1 processes are forked on Erlang node H1, P2 processes on node H2, +-- and so on. +-- The function returns a pair {Workers, GlobalTableSize}, where +-- * GlobalTableSize is the total number of slots of the global hash table, and +-- * Workers is a list of Worker, sorted wrt. TableOffset in ascending order. +start_workers :: HostInfo -> Process ([(ProcessId, Int, Int)], Int) +start_workers (JustOne host) = do + (workers, globalTableSize) <- do_start_shm host ([], 0) + return (reverse workers, globalTableSize) +start_workers (Many hosts) = do + (workers, globalTableSize) <- do_start_dist hosts ([], 0) + return (reverse workers, globalTableSize) + +do_start_shm :: (Int, Int, Int, Bool) -> ([(ProcessId, Int, Int)], Int) + -> Process ([(ProcessId, Int, Int)], Int) +do_start_shm (0, _, _, _) acc = return acc +do_start_shm (m, tabSize, tmOut, spawnImgComp) (workers, gTabSize) = do + selfNode <- getSelfNode + pid <- spawnLink selfNode ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp)) + do_start_shm (m - 1, tabSize, tmOut, spawnImgComp) + ((pid, gTabSize, tabSize) : workers, gTabSize + tabSize) + +do_start_dist :: [(NodeId, Int, Int, Int, Bool)] -> ([(ProcessId, Int, Int)], Int) + -> Process ([(ProcessId, Int, Int)], Int) +do_start_dist [] acc = return acc +do_start_dist ((_, 0, _, _, _) : hosts) acc = do_start_dist hosts acc +do_start_dist ((node,m,tabSize,tmOut,spawnImgComp) : hosts) (workers,gTabSize) = do + pid <- spawnLink node ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp)) + do_start_dist ((node, m - 1, tabSize, tmOut, spawnImgComp) : hosts) + ((pid, gTabSize, tabSize) : workers, gTabSize + tabSize) + +-- collect_credit collects leftover credit from idle workers until +-- the credit adds up to 1. +collect_credit :: Credit -> Process () +collect_credit crdt + | is_one crdt = return () + | otherwise = receiveWait [ + match $ \("done", workersCredit) -> + collect_credit $ credit workersCredit crdt + ] + +-- collect_orbit collects partial orbits and stats from N workers. +collect_orbit :: Int -> Int -> Process ([Vertex], [MasterStats]) +collect_orbit elapsedTime n = do + (orbit, stats) <- do_collect_orbit n [] [] + return (concat orbit, master_stats elapsedTime stats : stats) + +do_collect_orbit :: Int -> [[Vertex]] -> [WorkerStats] + -> Process ([[Vertex]], [WorkerStats]) +do_collect_orbit 0 partOrbits workerStats = return (partOrbits, workerStats) +do_collect_orbit n partOrbits workerStats = do + receiveWait [ + match $ \("result", partOrbit, workerStat) -> + do_collect_orbit (n - 1) (partOrbit : partOrbits) (workerStat : workerStats) + ]