# Copyright (C) 2018 RW Bunney
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
########################################################################
"""
This module contais code for implementing heuristic-based scheduling
algorithms. Currently, this file implements the following algorithms:
* HEFT
* PHEFT
"""
from random import randint
import networkx as nx
import operator
import copy
import logging
from tqdm import tqdm
from shadow.models.solution import Solution
from collections import deque
RANDMAX = 1000
# logging.basicConfig(level="DEBUG")
LOGGER = logging.getLogger(__name__)
#############################################################################
############################# HUERISTICS ###################################
#############################################################################
[docs]def heft(workflow, position=0):
"""
Implementation of the original HEFT algorithm, Topcuolgu 2002.
:params workflow: The workflow object to schedule
:returns: The Solution object generated by the algorithm
"""
if workflow.env is None:
raise RuntimeError("Workflow environment is not initialised")
LOGGER.debug('Ranking tasks')
task_ranks = calculate_upward_ranks(workflow, position)
for task in workflow.tasks:
task.rank = task_ranks[task]
LOGGER.debug('Allocating tasks using insertion policy')
solution = insertion_policy(workflow, position)
return solution
def pheft(workflow):
"""
Implementation of the PHEFT algorithm, which adaptst the HEFT algorithm
using the concpet of an Optimistic Cost Table (OCT)
"""
oct_rank_matrix = generate_ranking_matrix(workflow)
# Rank tasks according to the oct_rank_matrix
for task in workflow.tasks:
total = 0
for (t, p) in oct_rank_matrix:
if t is task:
total += oct_rank_matrix[(t, p)]
rank = int(total / len(workflow.env.machines))
task.rank = rank
solution = insertion_policy_oct(workflow, oct_rank_matrix)
return solution
def fcfs(workflow, greedy=True, seed=None):
"""
Implementation of First Come First Serve Algorithm
:param workflow:
:param greedy:
:param seed:
:return:
"""
if workflow.env is None:
raise RuntimeError("Workflow environment is not initialised")
solution = fcfs_allocation(workflow, greedy, seed)
return solution
def minimin(workflow):
"""Real-Time Task Mapping and Scheduling for Collaborative
In-Network Processingin DVS-Enabled Wireless Sensor Networks
Procedure to identifying the meta-tasks:
Topological sort
Create a set of sets
For each node,
If no parent --> update first set
If parent
For each set
If parent in set
Add node to set below
Now we will have something like:
{{0}, {1,2,3},{4,5,6},{7},{8,9},{10}}
"""
pass
def hlfet(workflow):
pass
def mapping(workflow):
pass
# TODO: Partial Critical Paths
def pcp(workflow):
return None
# TODO: Multi-objective list scheduling
def mols(workflow):
return None
#############################################################################
############### HELPER FUNCTIONS & HEURISTIC-SPECIFIC POLICIES ##############
#############################################################################
def generate_ranking_matrix(workflow):
"""
Optimistic cost table ranking heuristic outlined in
Arabnejad and Barbos (2014)
"""
oct_rank_matrix = {}
stack = deque()
for task in sorted(list(workflow.tasks)):
for machine in workflow.env.machines:
if (task, machine) not in oct_rank_matrix.keys():
stack.append((task, machine))
while len(stack) > 0:
(curr_task, pk) = stack.pop()
max_successor = 0
for successor in workflow.graph.successors(curr_task):
min_machine = 1000
for machine in workflow.env.machines:
comp_cost = successor.calculated_runtime(machine)
comm_cost = 0
if machine is not pk:
comm_cost = ave_comm_cost(workflow, curr_task, successor)
oct_val = oct_rank_matrix[
(successor, machine)] + comp_cost + comm_cost
min_machine = min(min_machine, oct_val)
max_successor = max(max_successor, min_machine)
oct_rank_matrix[(curr_task, pk)] = max_successor
return oct_rank_matrix
def calculate_upward_ranks(workflow, position=0, progress=True):
"""
Upward ranking heuristic outlined in Topcuoglu, Hariri & Wu (2002)
Closely modelled off 'cal_up_rank' function at:
https://github.com/oyld/heft/blob/master/src/heft.py
Ranks individual tasks and then allocates this final value
to the attribute of the workflow graph
:param workflow - Subject workflow
:param task - A task task in an DAG that is being ranked
"""
# task = list(workflow.graph.nodes)
# if len(list(workflow.graph.sucessors(task))) == 0:
# return ave_comm_cost( wf, task, successor)
task_ranks = {}
unranked = [node for node in workflow.graph.nodes()]
for node in workflow.graph.nodes():
longest_rank = 0
if len(list(workflow.graph.successors(node))) == 0:
task_ranks[node] = max(
node.calc_ave_runtime(workflow.env) + longest_rank, 0)
unranked.remove(node)
_ranked_node_count = 1
_total_nodes = len(workflow.graph.nodes())
pbar = None
if progress:
pbar = tqdm(total=_total_nodes, unit="Tasks", desc=f'Ranking tasks '
f'{position}',
position=position)
while unranked:
for node in unranked:
_ranked_node_count += 1
longest_rank = -1
succs = list(workflow.graph.successors(node))
if all(k in task_ranks for k in succs):
for s in succs:
if longest_rank < 0:
longest_rank = ave_comm_cost(workflow, node, s) + \
task_ranks[s]
else:
longest_rank = max(longest_rank,
ave_comm_cost(workflow, node, s) +
task_ranks[s])
if not longest_rank < 0:
task_ranks[node] = longest_rank + max(
node.calc_ave_runtime(workflow.env), 1)
unranked.remove(node)
if progress:
pbar.update()
if progress:
pbar.close()
return task_ranks
def ave_comm_cost(workflow, task, successor):
transfer_data = workflow.graph.edges[task, successor]['transfer_data']
# return workflow.graph.edges[task, successor]['transfer_data']
return workflow.env.calc_data_transfer_time(transfer_data=transfer_data)
def ave_comp_cost(workflow, task):
comp = workflow.tasks[task]['comp']
return sum(comp) / len(comp)
def max_comp_cost(workflow, task):
comp = workflow.tasks[task]['comp']
return max(comp)
def min_comp_cost(workflow, task):
comp = workflow.tasks[task]['comp']
return min(comp)
def calc_est(workflow, task, machine, solution):
"""
Calculate the Estimated Start Time of a task on a given processor
"""
est = 0
predecessors = workflow.graph.predecessors(task)
for pretask in predecessors:
# If task isn't on the same processor, there is a transfer cost
alloc = solution.task_allocations[pretask]
pre_machine_alloc = alloc.machine
rate = workflow.env.system_bandwith
if pre_machine_alloc != machine and rate > 0:
comm_cost = int(
workflow.graph.edges[pretask, task]['transfer_data'] / rate)
else:
comm_cost = 0
aft = alloc.aft
tmp = aft + comm_cost
if tmp >= est:
est = tmp
machine_str = machine
curr_allocations = solution.list_machine_allocations(machine_str)
available_slots = []
num_alloc = len(curr_allocations)
prev = None
if curr_allocations:
for i, alloc in enumerate(curr_allocations):
if i == 0:
# If the start time of the first allocation is not 0
if alloc.ast != 0:
available_slots.append((0, alloc.ast))
else:
continue
elif alloc.ast < est:
continue
else:
prev_alloc = curr_allocations[i - 1]
available_slots.append((prev_alloc.aft, alloc.ast))
# We want the finish time of the latest allocation.
final_alloc = curr_allocations[-1]
available_slots.append((final_alloc.aft, -1))
for slot in available_slots:
(start, end) = slot
if est < start and start + task.calc_runtime(machine) <= end:
return start
if (est >= start) and est + task.calc_runtime(machine) <= end:
return est
# At the 'end' of available slots
if (est >= start) and (end < 0):
return est
# This last case occurs when we have a low est but a high cost, so
# it doesn't fit in any gaps; hence we have to put it at the 'end'
# and start it late
if (est < start) and (end < 0):
return start
return est
def insertion_policy(workflow, position=0, progress=True):
"""
Allocate tasks to machines following the insertion based policy outline
in Tocuoglu et al.(2002)
"""
makespan = 0
prev_aft, prev_ast = (0, 0)
# tasks = sort_tasks(workflow, 'rank')
sorted_tasks = workflow.sort_tasks('rank')
# tmp = workflow.tasks
solution = Solution(workflow.env.machines)
_debug_task_count = 0
_total_tasks = len(sorted_tasks)
pbar = None
update = 0
if progress:
pbar = tqdm(total=_total_tasks, unit="Tasks", desc='Allocating '
'tasks',
position=position)
for task in sorted_tasks:
_debug_task_count += 1
# Treat the first task differently, as it's the easiest to get the
# lowest cost
if task == list(sorted_tasks)[0]: # Convert networkx NodeView to list
m, w = task.calc_mininum_runtime(workflow.env)
# m2, w2 = min(
# task.test_runtime.items(),
# key=operator.itemgetter(1)
# )
# assert m == m2
# assert w == w2
ast = 0
aft = w
solution.add_allocation(task=task, machine=m, ast=ast, aft=aft)
else:
aft = -1 # Finish time for the current task
m = 0
for machine in workflow.env.machines:
# tasks in self.rank_sort are being updated, not workflow.graph;
est = calc_est(workflow, task, machine, solution)
if aft == -1: # assign initial value of aft for this task
aft = est + task.calc_runtime(machine)
m = machine
# see if the next processor gives us an earlier finish time
elif est + task.calc_runtime(machine) < aft:
aft = est + task.calc_runtime(machine)
m = machine
ast = aft - task.calc_runtime(m)
if aft >= makespan:
makespan = aft
solution.add_allocation(task=task, machine=m, ast=ast, aft=aft)
if progress:
update = len(solution.allocations) - update
pbar.update(len(solution.allocations))
if progress:
pbar.close()
solution.makespan = makespan
return solution
def insertion_policy_oct(workflow, oct_rank_matrix):
"""
Allocate tasks to machines following the insertion based policy outline
in Tocuoglu et al.(2002)
"""
makespan = 0
if not oct_rank_matrix:
oct_rank_matrix = generate_ranking_matrix(workflow)
eft_matrix = dict()
oeft_matrix = dict()
m = None
sorted_tasks = workflow.sort_tasks('rank')
solution = Solution(workflow.env.machines)
for task in sorted_tasks:
if task == list(workflow.tasks)[0]:
ast = 0
min_oeft = -1
for machine in workflow.env.machines:
eft_matrix[(task, machine)] = task.calculated_runtime(machine)
oeft_matrix[(task, machine)] = eft_matrix[(task, machine)] + \
oct_rank_matrix[(task, machine)]
if (min_oeft == -1) or (
oeft_matrix[(task, machine)] < min_oeft):
min_oeft = oeft_matrix[(task, machine)]
m = machine
aft = task.calculated_runtime(m)
solution.add_allocation(task=task, machine=m, ast=ast, aft=aft)
else:
min_oeft = -1
for machine in workflow.env.machines:
if workflow.graph.predecessors(task):
est = calc_est(workflow, task, machine, solution)
else:
est = 0
eft = est + task.calculated_runtime(machine)
eft_matrix[(task, machine)] = eft
oeft_matrix[(task, machine)] = eft_matrix[(task, machine)] + \
oct_rank_matrix[(task, machine)]
if (min_oeft == -1) or (
oeft_matrix[(task, machine)] < min_oeft):
min_oeft = oeft_matrix[(task, machine)]
m = machine
aft = eft_matrix[(task, m)]
ast = aft - task.calculated_runtime(m)
task.machine = m
if aft >= makespan:
makespan = aft
solution.add_allocation(task=task, machine=m, ast=ast, aft=aft)
solution.makespan = makespan
return solution
def fcfs_allocation(workflow, greedy, seed):
makespan = 0
# tasks = sort_tasks(workflow, 'rank')
sorted_tasks = workflow.sort_tasks("topological")
# tmp = workflow.tasks
solution = Solution(workflow.env.machines)
earliest_alloc = {m: 0 for m in workflow.env.machines}
for task in sorted_tasks:
pred = list(workflow.graph.predecessors(task))
if not pred:
start_time = 0
nmachine, start_val = next_available_start(earliest_alloc)
# update earliest for machine as runtime for this task
w = task.calculated_runtime(nmachine)
earliest_alloc[nmachine] = w
m = nmachine
ast = start_time
aft = w
machine = m
solution.add_allocation(task, machine, ast=ast, aft=aft)
else:
nmachine, start_val = next_available_start(earliest_alloc)
est = calc_earliest_start_time_on_machine(task, nmachine, start_val,
workflow, solution)
ast = est
aft = est + task.calculated_runtime(nmachine)
earliest_alloc[nmachine] = aft
# aft = finish_time
if aft > makespan:
makespan = aft
solution.add_allocation(task, nmachine, ast=ast, aft=aft)
solution.makespan = makespan
return solution
# TODO address control flow if predecessor doesn't exist due to multiple
# head nodes
def find_next_available_machine(start, earliest_allocs):
"""
Based on estimated task start time, check if a machine is able to run
that task.
If the start time is before the earliest allocation
Parameters
----------
task
earliest_allocs
Returns
-------
name of machine and the earleist time it can run that task
"""
for m in earliest_allocs:
if start < earliest_allocs[m]:
start = earliest_allocs[m]
return m, start
else:
return m, start
raise RuntimeError(
'Unable to find latest allocation time - something is wrong')
def next_available_start(earliest_allocs):
machine = None
val = -1
for m in earliest_allocs:
if val < 0:
machine = m
val = earliest_allocs[m]
else:
if val > earliest_allocs[m]:
val = earliest_allocs[m]
machine = m
return machine, val
def calc_earliest_start_time_on_machine(task, machine, pred_start, workflow,
solution):
"""
Given a task and machine pair, calculate what time it will actually start
This is designed to take into account a task that is allocated on a
machine to which it's predecessor(s) were not, and therefore has
additional transfer costs associated with it.
Parameters
----------
task : shadow.models.workflow.Task object
Task for which we are finding the earliest start time
machine :
First available machine that has been picked for the task allocation
pred_start : Latest finishing predecessor finish time
workflow :
solution
Notes
------
`pred_finish` is the finish time of the latest finishing task preceeding
the on in focus. This is the absolute earliest we can start this task:
e.g. if (1,4), (2,4), and (3,4) are all edges, we have to wait for Tasks 1,
2, and 3 to complete. If 2 finishes last of the three, then the finish
time of 2 is what we are passing to this function.
Solution is used to retrieve the previous machine allocation for each
task in the predecessor graph.
Returns
-------
"""
est = pred_start
pred = workflow.graph.predecessors(task)
for pretask in pred:
# If task isn't on the same processor, there is a transfer cost
alloc = solution.task_allocations[pretask]
pre_machine_alloc = alloc.machine
rate = workflow.env.system_bandwith
if pre_machine_alloc != machine and rate > 0:
comm_cost = int(
workflow.graph.edges[pretask, task]['transfer_data'] / rate)
else:
comm_cost = 0
aft = alloc.aft
tmp = aft + comm_cost
if tmp >= est:
est = tmp
return est
def latest_allocation(task, solution, workflow):
"""
Find the latest allocation start time
Parameters
----------
task
solution
workflow
Returns
-------
"""
pred = list(workflow.graph.predecessors(task))
pred.sort(key=lambda x: x.aft, reverse=True)
task_allocations = [alloc for alloc in
list(solution.task_allocations.values())]
pred_task_allocations = [alloc for alloc in task_allocations if
alloc.task in pred]
pred_task_allocations.sort(key=lambda alloc: alloc.aft, reverse=True)
return pred_task_allocations[0] # pred[0]