文章目录
前言
软件工程的第一个作业——使用编程实现PERT图,在此分享我的实现思路。
代码开源地址:https://github.com/Aki-Hoyue/projects/tree/main/SE/PERT
PS:如果有能力,建议观看GitHub中的英文版报告,因为一开始写的就是英文版,本篇可能有些地方有翻译痕迹。如果对代码部分有任何问题,欢迎留言讨论。
介绍与环境
计划评审技术(英语:Program Evaluation and Review Technique,简称“PERT”),主要目的是针对不确定性较高的工作项目,以网络图规划整个专案,以排定期望的专案时程。PERT图把项目描绘成一个由编号结点(圆形或者方形)构成的网络图,编号节点代表着项目中的任务。每个结点都被编号,并且标注了任务、工期、开始时间和完成时间。线条上的箭头方向标明了任务次序,并且标识出在开始一个任务前必须完成那些任务。
该评审技术的主轴为“乐观时间”、“最有可能时间”及“悲观时间”
最核心公式为“专案期望时间”= ((乐观时间+悲观时间+4*最可能时间))/6
我使用的是python进行编程,需要的环境有:
- Python 3.x及更高版本Python包至少需要:
- os:提供文件操作。
- json:提供JSON文件操作。
- math:提供一些数学函数。
- scipy.stats: 提供统计函数和分布。
- networkx:创建、操作和分析复杂网络图。
- matplotlib.pyplot: 提供用于创建可视化效果的函数。
- queue:提供用于实现队列的类。
- tabulate:从数据创建格式化的表格,这对于以清晰和有组织的方式显示数据很有用。推荐使用Anaconda,一个集成环境,安装具体可以看:
- 数据文件遵循JSON格式
使用说明
数据文件默认数据文件为data.json,与代码在同一目录下。
它遵循JSON格式,是一个列表,其中每个节点都是一个字典,字典的示例格式是:
{
"Activity Code": "A",
"Immediate Predecessor": ["B", "C"],
"Optimistic Estimate": 12,
"Most Likely Estimate": 15,
"Pessimistic Estimate": 18
}
对于每个活动的前置,如果它不存在则是一个空列表[]
。除此之外,我在文件夹中还提供了两个测试数据:data1.json(与data.json数据相同),data2.json
在终端中运行程序,会显示一个菜单。如图:
你在进行计算前,必须先加载数据文件。 按1或2以加载文件。
- Press1: 加载默认文件data.json
- Press2: 加载一个自定义文件,该文件位于程序的同一文件夹或给出绝对路径。例如:
成功加载文件后,你将看到通知:Data loaded successfully!(数据加载成功!),并回到菜单上。
如果输入了错误的文件,程序将报告错误并退出。
然后,你可以选择选项3 - 9来实现相应的功能。 e.g.(使用数据文件data2.json)
-
-
Press 4: 计算每个活动的时间,包括最早开始时间、最早完成时间、最晚开始时间、最晚完成时间、松弛时间以及是否处于关键路径中。(Calculate the time for each activity including Earliest Start Time, Earliest Finish Time, Latest Start Time, Latest Finish Time, Slack Time and Whether it is in the critical path.)
-
Press 5: 找到所有的关键路径。(Find the Critical Path.)
-
Press 6: 计算期望项目完成时间。(Calculate the expected project completion time.)
-
Press 7: 计算在目标时间内完成项目的概率。假设我们计算在大于145天内完成项目的概率。我们输入目标时间为145,分布模式为l(larger)。
(Calculate the probability of completing the project by a targeted time.
Assume that we calculate the probability to complete the project in larger than 145 days.
We input Targeted time is 145 and Distribution mode is l(larger).)
-
Press 8: 计算所有项目(使用默认数据)。
如果不加载文件,它将加载默认数据并在项之前显示。 如果你已经加载了一个文件,它将根据你的数据进行计算。
Calculate all the items(By default data).
If you don't load a file, it will load default data and show before items. If you have loaded a file, it will calculate by your data.
-
Press 9: 展示活动拓扑图,将在一个新的窗口中打开。
Draw an activity graph.It will show in a new window.
-
设计思路
存储设计
设计存储的时候,我设计了一个列表PERT
去存储输入的数据。每一项都是一个字典,初始化为PERT = []
同时定义一个类 ,其构造函数将类Graph
的每个对象初始化为一个空字典 graph
字典,以获取有关此图的信息。
在类中的存储,我使用字典里嵌套字典的方式,便于遍历和存储信息。初始化代码如下:
# Define an array to store data and each item is a dictionary
PERT = []
class Graph:
def __init__(self):
self.graph = {}
在函数file_load
中,它从JSON文件中读取数据并将其存储在PERT
数组中。 函数file_load
的参数是一个数据文件目录,可以通过用户输入传入,默认值为data.json
。
在设计中,首先确定路径,如果路径不存在,则返回错误消息,否则将读取的数据存储在临时变量data
中。
下一步是遍历整个data
,一次读入一个字典。 Optimistic Estimate
, Most Likely Estimate
and Pessimistic Estimate
数据可用于使用公式直接计算Estimate During
和方差。 这些信息将作为PERT表的元素写入。
最后,返回1,表示文件内容已成功读取。
# Load data from json file
def file_load(filename="data.json"):
if os.path.exists(filename):
with open(filename, "r") as data_table:
data = json.load(data_table)
else:
print("Error, file not found!")
print_separator()
return 0
# Iterate through data
for item in data:
activity_code = item['Activity Code']
immidiate_predecessor = item['Immediate Predecessor'] # An array
optimistic_estimate = item['Optimistic Estimate']
most_likely_estimate = item['Most Likely Estimate']
pessimistic_estimate = item['Pessimistic Estimate']
estimate_during = (optimistic_estimate + 4 * most_likely_estimate + pessimistic_estimate) / 6
variance = ((pessimistic_estimate - optimistic_estimate) / 6) ** 2
PERT.append({"activity code": activity_code, "immidiate_predecessor": immidiate_predecessor, "estimate_during": estimate_during, "variance": variance})
return 1
读入数据后,我们仍然需要将数据加载到Graph
类中,我在Graph
类中设计了成员函数add_activity
。 它的参数是存储输入的表PERT
。
在添加数据到图中时,除了添加一些必要的信息外,我还设计了next_activity
列表来存储活动的后驱节点,这样就可以很容易地找到earliest start
和earliest finish
。
# Add a node to the graph
def add_activity(self, PERT):
immidiate_predecessor = PERT['immidiate_predecessor']
time = PERT['estimate_during']
variance = PERT['variance']
# Set predecessor to the new node. Next_activity is an array that stores the next activity.
for i in immidiate_predecessor:
if (i in self.graph):
self.graph[i]["next_activity"].append(PERT['activity code'])
# Add the new node to the graph
self.graph[PERT['activity code']] = {
"activity_code": PERT['activity code'],
"estimate_during": time,
"variance": variance,
"next_activity": [],
"before_activity": immidiate_predecessor,
"earliest_start_time": 0,
"earliest_finish_time": 0,
"latest_start_time": 0,
"latest_finish_time": 0,
"slack_time": -1,
"critical_path": False
}
计算最早开始和最早完成时间
在函数earliest_time
中,我们初始化没有前驱的节点作为起点,然后通过的遍历后驱来依次计算ES
和EF
。
在第一层循环中,逐个迭代对象graph
字典中的活动名,在第二层循环中,循环遍历当前活动的后驱,如果后驱的ES
小于当前节点的EF
,则后驱的ES
被替换为当前节点的EF
,并重新计算后驱的EF
。
在循环结束时,所有活动的ES
和EF
都已经计算完毕。
# Calculate the earliest start time and earliest finish time
def earliest_time(self):
# Initialize the earliest start time and earliest finish time
for i in self.graph:
if (self.graph[i]["before_activity"] == []):
self.graph[i]["earliest_finish_time"] = self.graph[i]["estimate_during"]
# Caculate the earliest start time and earliest finish time
for i in self.graph:
for j in self.graph[i]["next_activity"]:
if (self.graph[j]["earliest_start_time"] < self.graph[i]["earliest_finish_time"]):
self.graph[j]["earliest_start_time"] = self.graph[i]["earliest_finish_time"]
self.graph[j]["earliest_finish_time"] = self.graph[j]["earliest_start_time"] + self.graph[j]["estimate_during"]
计算最晚开始和最晚完成时间
在函数latest_time
中,我们同样先将没有后驱的节点初始化,它们的LF
都是最大的 LF
。故我们需要先找到最大的EF
才能找到最大的LF
。
在第一层循环中,倒序遍历活动名,第二层循环,循环遍历当前活动的前驱节点,如果前驱节点的LF
大于当前节点的LS
,则前驱节点的LF
会被当前节点的LS
取代,并重新计算该节点的LS
。
最后,循环结束是会计算处所有活动的LS
和LF
。
# Calculate the latest start time and latest finish time
def latest_time(self):
# Initialize the latest start time and latest finish time
estimate_time = 0
for i in reversed(self.graph):
if (self.graph[i]["next_activity"] == []):
estimate_time = max(estimate_time, self.graph[i]["earliest_finish_time"])
self.graph[i]["latest_finish_time"] = estimate_time
self.graph[i]["latest_start_time"] = self.graph[i]["latest_finish_time"] - self.graph[i]["estimate_during"]
# Calculate the latest start time and latest finish time
for i in reversed(self.graph):
for j in self.graph[i]["before_activity"]:
if (self.graph[j]["latest_finish_time"] == 0 or self.graph[j]["latest_finish_time"] > self.graph[i]["latest_start_time"]):
self.graph[j]["latest_finish_time"] = self.graph[i]["latest_start_time"]
self.graph[j]["latest_start_time"] = self.graph[j]["latest_finish_time"] - self.graph[j]["estimate_during"]
计算松弛时间
计算松弛时间(slack time)非常简单,只需计算每个项目的FS - ES
即可。
# Calculate the slack time LS-ES
def slack_time(self):
# Calculate the slack time
for i in self.graph:
self.graph[i]["slack_time"] = self.graph[i]["latest_start_time"] - self.graph[i]["earliest_start_time"]
计算关键路径
确定一项活动是否为关键路径(critical path)也非常简单; 如果活动的松弛时间为 0,则它是关键路径,否则不是。
# Calculate the critical path that Slack = 0
def critical_path(self):
# Calculate the critical path
for i in self.graph:
if (self.graph[i]["slack_time"] == 0):
self.graph[i]["critical_path"] = True
输出
本部分是输出的集合,用于打印上述函数的结果。
首先,我们应该在输出之前执行那些计算函数,以便为输出提供足够的数据。 在函数初始化中调用这些计算函数initization
。
# Initialization before printing
def initization(self):
self.earliest_time()
self.latest_time()
self.slack_time()
self.critical_path()
然后我们输出Estimate duration
,将其输出为表格,使用tabulate
模块进行制表,将每个活动的数据写入Estimate_table
,并设置标题和表格格式。
# Print the Estimate duration
def print_estimate_duration(self):
print("Estimate duration: ")
Estimate_table = [["Activity", "Estimate Duration", "Variance"]]
for i in self.graph:
Estimate_table.append([self.graph[i]["activity_code"], self.graph[i]["estimate_during"], self.graph[i]["variance"]])
print(tabulate(Estimate_table, headers="firstrow", tablefmt='fancy_grid', numalign="center"))
同理,将必要的数据写入table
中,输出ES, EF, LS, LF
等内容,并设置标题和表格格式。
# Print the earliest/latest start time and finish time
def print_PERTtable(self):
print("PERT table: ")
table = [["Activity", "Ealiest Start", "Ealiest Finish", "Latest Start", "Latest Finish", "Slack Time", "Critical Path"]]
for i in self.graph:
table.append([
self.graph[i]["activity_code"],
self.graph[i]["earliest_start_time"],
self.graph[i]["earliest_finish_time"],
self.graph[i]["latest_start_time"],
self.graph[i]["latest_finish_time"],
self.graph[i]["slack_time"],
self.graph[i]["critical_path"]])
print(tabulate(table, headers="firstrow", tablefmt='fancy_grid'))
输出关键路径,用->
连接每个关键路径
# Print the critical path
def print_critical_path(self):
print("Critical path: ")
for i in self.graph:
if (self.graph[i]["critical_path"]):
print(self.graph[i]["activity_code"], end=" ")
if (self.graph[i]["next_activity"] != []):
print("->", end=" ")
print()
在输出预计项目完成时间时,需要通过keys()
属性找到最后一个最大的LF/EF
并输出。
# Print the expected project completion time
def print_expected_project_completion_time(self):
keys = list(self.graph.keys())
print("Expected project completion time: %s" % self.graph[keys[-1]]["latest_finish_time"])
计算概率
为了计算目标完成时间比实际完成时间larger/smaller (l/s)的概率,设计了函数probability
。 它有两个参数,一个是目标完成时间targeted
,另一个是分布模式,可以是它可以是大于/小于larger/smaller (l/s)
基于公式Standard Deviation
和Z-Score
可以计算,概率可以基于scipy.stats
模块计算。
# Calculate the probability of completing the project. mode = larger/smaller
def probability(self, targeted, mode):
if (mode == "l"):
mode = "larger"
elif (mode == "s"):
mode = "smaller"
var_sum = 0
for i in self.graph:
if (self.graph[i]["critical_path"]):
var_sum += self.graph[i]["variance"]
standard_deviation = math.sqrt(var_sum)
keys = list(self.graph.keys())
expected = self.graph[keys[-1]]["latest_finish_time"]
zscore = (targeted - expected) / standard_deviation
if (mode == "larger"):
probability = 1 - st.norm.cdf(zscore)
else:
probability = st.norm.cdf(zscore)
print("Z-score: {}\nProbability of project completion {} than {} is: {:.2%} ({}). ".format(zscore, mode, targeted, probability, probability))
画图
在函数graph_table
中,G用作网格图,graph_dict
作为对象的graph
,而队列Q则被设计用于使用FIFO 中生成节点和边。pos
作为一个地图(map)来存储每个节点的坐标,postion
字典记录了每个节点的row
和col
以便生成之后节点的坐标。
首先,我先生成一条由多个关键路径组成的直线,用于代表关键路径,初始节点坐标为(0,0),依次加1并添加边,并附上边的标签(活动名称)。 对于添加的每个节点,该节点都会进入队列,以便此后可以按照顺序生成其子节点。
接下来设置一个指向第一个节点的初始化节点。(Start)
接着,生成非关键路径的节点。 一个节点根据其后驱节点的数量生成一个range_col,它代表其后驱节点纵坐标的最大值,每个后驱节点的纵坐标相差1。当只有一个后驱的情况下,其后驱节点的纵坐标与当前节点相同。
特殊情况:如果当前节点没有后驱并且不是最后一个关键路径节点,则它指向最后一个关键路径节点。(说明它是最后一位节点并直接指向END)
接下来遍历当前节点的所有后驱,如果后驱不在已添加的节点列表中,则添加它,添加实线边并将后驱入队。
否则,判断当前节点的EF
和后驱节点ES
的大小,如果当前节点的EF
较大,则需要将后代节点的原有的边设置为虚线,并添加新的实线边。 如果后驱节点的ES
很大,则直接添加虚线边。
这里有一个特别判断,如果当前节点的EF
和后驱节点的ES
大小相同,则需要将后代节点原来的非关键路径的边设置为虚线,并添加实线边。
最后通过matplotlib.pyplot
和networkx
模块分别绘制节点、边以及边的标签,并最后展示出来。
# Graph the table
def graph_table(self):
graph_dict = self.graph
G = nx.DiGraph()
pos = {}
Q = queue.Queue()
postion = {}
# Set the critical path
row = 0
for i in graph_dict:
if (graph_dict[i]["critical_path"]):
Q.put(graph_dict[i]["activity_code"])
G.add_node(graph_dict[i]["activity_code"])
postion[graph_dict[i]["activity_code"]] = {"row": 0, "col": 0}
pos[graph_dict[i]["activity_code"]] = (row, 0)
row += 1
postion[graph_dict[i]["activity_code"]]["row"] = row
postion[graph_dict[i]["activity_code"]]["col"] = 0
for j in graph_dict[i]["before_activity"]:
if(j in G.nodes()):
G.add_edge(graph_dict[j]["activity_code"], graph_dict[i]["activity_code"], name=graph_dict[i]["activity_code"])
# Set the startpoint and endpoint
G.add_node("Start")
pos["Start"] = (-1, 0)
G.add_edge("Start", Q.queue[0], name = Q.queue[0])
last_critical_point = Q.queue[Q.qsize() - 1]
dash = []
# Set the non-critical path
while not Q.empty():
cur = Q.get()
# print(cur)
if len(graph_dict[cur]["next_activity"]) > 1:
range_col = math.floor(math.log2(len(graph_dict[cur]["next_activity"])))
elif len(graph_dict[cur]["next_activity"]) == 1:
range_col = postion[cur]["col"]
if graph_dict[cur]["next_activity"] == [] and cur != last_critical_point:
G.add_edge(cur, last_critical_point)
dash.append((cur, last_critical_point))
continue
for next_node in graph_dict[cur]["next_activity"]:
if next_node not in G.nodes():
G.add_node(next_node)
col = range_col
row = (postion[cur]["row"]) * (1 - (col / 10.0)) + 1
postion[next_node] = {"row": row, "col": col}
pos[next_node] = (row, col)
range_col -= 1
if range_col == 0:
range_col -= 1
G.add_edge(cur, next_node, name=next_node)
Q.put(next_node)
else:
cur_EF = graph_dict[cur]["earliest_finish_time"]
next_ES = graph_dict[next_node]["earliest_start_time"]
if cur_EF > next_ES:
pred = list(G.predecessors(next_node))
for j in range(len(pred)):
G.remove_edge(pred[j], next_node)
G.add_edge(pred[j], next_node)
dash.append((pred[j], next_node))
G.add_edge(cur, next_node, name=next_node)
elif cur_EF < next_ES:
for j in graph_dict[next_node]["before_activity"]:
if graph_dict[j]["critical_path"]:
G.add_edge(cur, j)
dash.append((cur, j))
else:
cur == next_node
pred = list(G.predecessors(next_node))
for j in range(len(pred)):
if graph_dict[pred[j]]["critical_path"] and not graph_dict[cur]["critical_path"]:
G.remove_edge(pred[j], next_node)
G.add_edge(pred[j], next_node)
dash.append((pred[j], next_node))
G.add_edge(cur, next_node, name=next_node)
# Draw the graph
nx.draw_networkx_nodes(G, pos)
# nx.draw_networkx_edges(G, pos,label=True)
dashed_edges = [(u,v) for u,v in G.edges if (u,v) in dash]
nx.draw_networkx_edges(G, pos, edgelist=dashed_edges, style='dashed')
solid_edges = [(u,v) for u,v in G.edges if (u,v) not in dash]
nx.draw_networkx_edges(G, pos, edgelist=solid_edges, style='solid')
nx.draw_networkx_edge_labels(G, pos, edge_labels=nx.get_edge_attributes(G,'name'),font_family="sans-serif", font_size=10)
plt.show()
辅助函数
程序中还有一些辅助函数,可以帮助生成和判断一些情况。
print_separator
函数用于生成分隔符以允许分隔重要的输出。
load_test
函数用于测试数据是否已经加载,根据情况对其参数类型进行不同的分析。
- 当Type = 0时,函数会输出加载消息,返回1表示加载成功并输出成功消息,返回0表示已经加载。
- 当Type = 2时,情况与Type = 0非常相似,不同的是返回1时不执行任何操作。
- 当Type=1时,会先检查是否需要加载。 返回1表示已经加载,返回0表示需要先加载,并输出一条消息。
# Print separator
def print_separator():
print("\n------------------------------------------------------------\n")
# Loading test.
def loading_test(type):
if type == 0:
# Type = 0(load infomation -> return 1(load successfully infomation) or 0(has been loaded infomation) for default file
if not PERT:
print_separator()
print("Data loaded successfully!")
print_separator()
return 1
else:
print_separator()
print("Data has been loaded!")
print_separator()
return 0
elif type == 1:
# Type = 1(loading check -> return 1(print separator) or 0(need to load))
if not PERT:
print_separator()
print("Please load data first!")
print_separator()
return 0
else:
print_separator()
return 1
elif type == 2:
# Type = 2(for custom file like type 0)
if not PERT:
return 1
else:
print_separator()
print("Data has been loaded!")
print_separator()
return 0
主函数
主要功能比较简单,主要是一个菜单,根据用户输入进行功能调用和反馈。
# Main function
# Initialization
PERT_graph = Graph()
print("Welcome to PERT project management system!\nAuthor: Hoyue")
while True:
print("Menu: ")
print("1. Load data from default file(data.json).")
print("2. Load data from custom file.")
print("3. Calculate the estimated duration for each activity.")
print("4. Calculate the time for each activity including Earliest Start Time, Earliest Finish Time, Latest Start Time, Latest Finish Time, Slack Time and Whether it is in the critical path.")
print("5. Find the Critical Path.")
print("6. Calculate the expected project completion time.")
print("7. Calculate the probability of completing the project by a targeted time.")
print("8. Calculate all the items(By default data).")
print("9. Draw an activity graph.")
print("10. Exit.")
choice = input("Please press the number to choose the function: ")
if choice == "1":
if loading_test(0):
if file_load():
for i in range(len(PERT)):
PERT_graph.add_activity(PERT[i])
elif choice == "2":
if loading_test(2):
print_separator()
filename = input("Please input your filename in the software folder(Format as data.json): ")
if file_load(filename):
for i in range(len(PERT)):
PERT_graph.add_activity(PERT[i])
print("Data loaded successfully!")
print_separator()
elif choice == "3":
if loading_test(1):
PERT_graph.initization()
PERT_graph.print_estimate_duration()
print_separator()
elif choice == "4":
if loading_test(1):
PERT_graph.initization()
PERT_graph.print_PERTtable()
print_separator()
elif choice == "5":
if loading_test(1):
PERT_graph.initization()
PERT_graph.print_critical_path()
print_separator()
elif choice == "6":
if loading_test(1):
PERT_graph.initization()
PERT_graph.print_expected_project_completion_time()
print_separator()
elif choice == "7":
if loading_test(1):
PERT_graph.initization()
print("For cauculating the probability of completing the project, please input the targeted time and distribution mode(larger/smaller).")
targeted = float(input("Targeted time: "))
mode = input("Distribution mode(larger/smaller or l/s): ")
if (mode != "larger" and mode != "smaller" and mode != "l" and mode != "s"):
print_separator()
print("Error input! Please input again!")
print_separator()
continue
PERT_graph.probability(targeted, mode)
print_separator()
elif choice == "8":
if not PERT:
file_load()
for i in range(len(PERT)):
PERT_graph.add_activity(PERT[i])
print_separator()
PERT_graph.initization()
PERT_graph.print_estimate_duration()
PERT_graph.print_PERTtable()
PERT_graph.print_critical_path()
PERT_graph.print_expected_project_completion_time()
print("For cauculating the probability of completing the project, please input the targeted time and distribution mode(larger/smaller).")
targeted = float(input("Targeted time: "))
mode = input("Distribution mode(larger/smaller or l/s): ")
if (mode != "larger" and mode != "smaller" and mode != "l" and mode != "s"):
print_separator()
print("Error input! Please input again!")
print_separator()
continue
PERT_graph.probability(targeted, mode)
print_separator()
elif choice == "9":
if loading_test(1):
PERT_graph.initization()
print("Activity graph:")
print("Notice: If you don't have some fonts installed, you may get an error, just ignore it.")
PERT_graph.graph_table()
print_separator()
elif choice == "10":
print("Thank you! Goodbye!")
break
else:
print_separator()
print("Error input! Please input again!")
print_separator()
限制
我在设计的时候,只考虑了默认顺序下输入的情况,如果输入的数据存在前驱位于后驱后面的情况,可能会出现问题。以及输出的时候只考虑了末位,如果改为随机数据时可以试试增加变量存储。
还有就是在画图的时候,子节点的range_col和row的生成公式可以自定义,我设计并不是最优的情况,可能导致图看起来没这么好看。
后记
这是我使用python实现的PERT项目,练习python非常合适。当然也可以使用C++/Java等语言,核心思想都差不多。如果对代码部分有任何问题,欢迎留言讨论。
Comments NOTHING