求解分配问题(一) Kuhn

编程入门 行业动态 更新时间:2024-10-25 10:26:09

求解<a href=https://www.elefans.com/category/jswz/34/1770682.html style=分配问题(一) Kuhn"/>

求解分配问题(一) Kuhn

分配问题(Assignment Problem)很常见,例如给多个开发者安排开发任务,一个开发者只能分配到一个任务,一个任务也只能由一个人完成,不同的人对不同的任务有不同的完成时间,那么如何进行安排使得总的开发时间最短就是一个典型的分配问题。如果说传输问题是一类特殊的线性规划问题(传输问题定义),那么分配问题则是一类特殊的传输问题,如果我们把传输问题中所有Supplier和Demander的供应量或需求量都降到1,那么这就变成了一个分配问题

既然分配问题属于传输问题,那么Stepping-Stone算法或者Modified Distribution算法当然也可以直接应用于分配问题,但是我们可以想象一下,对于一个 n × n n\times n n×n的分配问题,它任何一个可行解的矩阵形式都是只有n个1,而迭代中非退化的条件是1的数量等于 n × n − 1 n\times n-1 n×n−1,这意味着每次迭代计算都要处理退化情况,所以直接应用传输问题的算法效率非常低。文本介绍处理分配问题常用的一种算法,一般称之为匈牙利算法(Hungarian Algorithm),不过实际上匈牙利算法的原始版本并非处理传输问题,下面介绍的其实是Harold W.Kuhn和James Munkre在此基础上提出的,所以严格说应该叫Kuhn-Munkre算法

算法主流程

先举个案例来方便描述:我们就假设有A,B,C,D四个开发任务,1,2,3,4四个开发者,各个开发者对开发任务的评估时间(单位: 分钟)可以用下面的cost矩阵来描述

Task \ Person1234
A21090180160
B10070130200
C175105140170
D8065105120

分配(A,1),(B,2),(C,3),(D,4)就是一种可行解,表述成矩阵形式:

Assign1234
A1000
B0100
C0010
D0001

对于上面的cost矩阵,我们通过观察可以发现,对某一个行或者某一列统一减去某个值(例如下表中将第一行统一减去10),其实并不影响问题的描述,也就是说通过行或列的减值操作,得到新的cost矩阵,在它上面计算的结果和原始的cost矩阵是一样的

Task \ Person1234
A210-10=20090-10=80180-10=170160-10=150
B10070130200
C175105140170
D8065105120

再考虑,如果某个开发者对一个task的评估时间是零,那么我们当然优先给他分配这个任务;如果每个开发者都有一个互不相同的评估时间是零的task,那么这种分配就是最优的,因为总时间是零。
结合这两个特点,我们可以想到,对各行各列,都减去那一行或列的最小值,这样就可以转化到一个包含多个零的cost矩阵,如果这个cost矩阵中存在没有行列冲突的4个零,那么我们就已经找到了最优解,例如下面的形式,分配(A,1),(C,2),(B,3),(D,4)就是最优分配

Task \ Person1234
A080170150
B100700200
C1750140170
D80651050

所以算法的第一步就是进行行列的减法操作,先将每一行减去该行的最小值,再将每一列减去该列的最小值,这样就可以产生多个零:

行减(Row Reduction):

Task \ Person1234
A12009070
B30060130
C7003565
D1504055

列减(Column Reduction):

Task \ Person1234
A10505515
B1502575
C550010
D0050

现在我们需要评估一下当前的cost矩阵,是否存在互相独立的4个零,所谓互相独立,指它们中任意两个既不在同一行也不在同一列;如何评估呢,我们在cost矩阵上划线,可以是在行上也可以在列上,用最少的线来覆盖到所有的零,如果刚好是4条,那么就存在相互独立的4个零;如果小于4条,则不存在,这是一个可以数学证明的推论,我们直接接受这个设定。在上面这个cost矩阵中,我们发现只用3条线就可以覆盖到所有的零


这也就意味着,我们找不到相互独立的四个零,当前不存在一个cost为零的最优解;怎么办呢,一个自然的想法就是在划线外的区域产生更多的零,这样划出的线可能达到4条了;那么怎么只对划线外的单元格进行减值操作呢,注意行减或列减操作必须是整行或整列,不能对单个单元格,但是我们可以组合行减和列减达到目的:先对所有非划线行的单元进行减值操作:

然后对所有在划线列上的单元进行加值操作:

这样的最终效果就是对所有划线外的单元减去 h h h,对划线的交点单元加上 h h h,而这个 h h h我们可以取划线外单元格的最小值,即10,所以,减值的结果变成:

Task \ Person1234
A950555
B502565
C45000
D010150

再来评估,发现仍然只能用3条线来覆盖,我们还没找到最优解:

继续对矩阵进行减值操作,得到:

Task \ Person1234
A900500
B002060
C45500
D015150

现在我们发现已经存在四个独立的零了:(B,1),(A,2),(C,3),(D,4),这就是最优的分配方案
因此,Kuhn-Munkre算法的步骤就是上面描述的过程:

  1. Row Reduction: 对cost矩阵的每一行,减去该行的最小值
  2. Column Reduction: 对每一列,减去该列的最小值
  3. 评估: 在cost矩阵上找出能覆盖所有零的最少数量的划线,如果划线数等于cost矩阵的边长n,那么已经存在最佳分配,退出算法;如果小于n,则继续4
  4. 设 h h h是所有不在划线中的单元的最小值,将所有不在划线中的单元减去 h h h,对划线的交点单元则加上 h h h
  5. 重复3,4

你可能会对步骤4有疑问,会不会步骤4一直产生不了最优解,导致算法陷入死循环呢? 其实步骤4的计算方式决定了算法一定会在有限步数内终止,证明如下:

  • 设 α \alpha α是步骤4中的减数,集合 I I I是行覆盖线集, J J J是列覆盖线集,在步骤4中,我们是先从 n ( n − ∣ I ∣ ) n(n-|I|) n(n−∣I∣)个单元中减 α \alpha α,然后给 n ∣ J ∣ n|J| n∣J∣个单元加上 α \alpha α,所以总的变化量是 α n ∣ J ∣ − α n ( n − ∣ I ∣ ) = α n ( ∣ I ∣ + ∣ J ∣ − n ) \alpha n|J|-\alpha n(n-|I|)=\alpha n(|I|+|J|-n) αn∣J∣−αn(n−∣I∣)=αn(∣I∣+∣J∣−n),而 ∣ I ∣ + ∣ J ∣ − n |I|+|J|-n ∣I∣+∣J∣−n是小于零的,所以总的变化量恒定为负,如果一直进行步骤4,cost矩阵的总和只会一直减小,也就意味着零的出现只会增多,算法一定会停止

对于算法的复杂度,研究者进行了详细的数学推导,证明该算法的复杂度是 n 3 n^3 n3,对比暴力搜索的 n ! n! n!,该算法的效率非常高

如何划线

算法的主体并不复杂,但是关键的划线步骤并没有描述细节,在上面的例子中,我们可以直观地找到最少数量的划线,但是怎么用一个算法来描述却并不简单,从复杂度上看,划线子算法要比主算法更难操作和理解,下面我结合一个例子来描述这个子算法

假设当前的cost矩阵为:

Task \ Person1234
A10505515
B002575
C35000
D00015
  1. 遍历cost矩阵中的每个零,如果在该零所属的行或列中不存在标 ∗ * ∗的零,那么就给它标上 ∗ * ∗;划出所有包含标 ∗ * ∗零的列 (标 ∗ * ∗零意味着相互独立,通过这个操作是初步找出了一些独立的零)

  1. 遍历每个未被线覆盖到的零,首先标上^,然后看它所在的行,如果该行有标 ∗ * ∗的零,对该行划线,同时移掉该行标 ∗ * ∗零所在列的线;如果该行没有标 ∗ * ∗零,则执行步骤3;重复步骤2,直到所有零都被线覆盖,退出子算法

根据步骤2的描述,首先找到单元格C4,对其标^,划下该行,移掉列2:


经过这个操作,单元格D3变成了一个未被覆盖的零,并且该行没有标 ∗ * ∗的零,所以需要进行步骤3的操作

  1. 构造由标^和标 ∗ * ∗零组成的序列:定义 Z 0 Z_0 Z0​是最新的那个标^零, Z 1 Z_1 Z1​是 Z 0 Z_0 Z0​所在列中的标 ∗ * ∗零, Z 2 Z_2 Z2​是 Z 1 Z_1 Z1​所在行中的标^零,以此类推,直到遇到用 Z 2 k Z_{2k} Z2k​表示的标^零,它所在列中不存在标 ∗ * ∗零. 对于这个序列上的零,我们移除掉所有标 ∗ * ∗零的 ∗ * ∗号,把标^零改为标 ∗ * ∗;注意到这个序列的首末两个零都是标^的,经过这个"反转"操作,标 ∗ * ∗零的数量多了一个;清空所有的^标记,清空覆盖线,划出所有包含标 ∗ * ∗零的列,回到步骤2

对于我们的例子,从单元格D3出发,我们可以找到序列 { Z D 3 , Z C 3 , Z C 4 } \{Z_{D3}, Z_{C3}, Z_{C4} \} {ZD3​,ZC3​,ZC4​}

"翻转"一下序列中的零:

现在可以看到已经没有未被覆盖的零了,而且划线数刚好是4,所以其实这个cost矩阵里已经包含了一个最优解。这个子算法一方面是在搜索划线,另一方面其实也是在找相互独立的最大数量的零,即标注 ∗ * ∗号的单元格

非平衡处理

上面的例子里开发者的数量和task数量相等,所以这是一个平衡分配问题,和传输问题一样,非平衡的情况更为多见,不过非平衡问题只需要在算法计算前对cost矩阵进行虚拟行或列的扩充就行了。
例如添加一个Task E,因为开发者只有4个,A到E的5个Task中必定有一个会空出来不被任何开发者领取,所以可以给cost矩阵添加一个cost为零的虚拟列,这对算法的计算没有任何影响

Task \ Person12345
A210901801600
B100701302000
C1751051401700
D80651051200
E951151201000

类似的,如果是开发者数量更多,则添加cost为零的虚拟行

代码实现

最后我写了一份python代码来实现上述的算法过程
定义了一个AdjacencyMatrixHungarian类(之所以用AdjacencyMatrix,是因为该算法可以看成是邻接矩阵形式的Hungarian算法),它的输入参数就是一个cost矩阵,在初始化时,判断是否需要进行非平衡处理;calculate方法是算法的主流程,而findMinCoverLines方法是划线搜索子算法的实现,该方法相对复杂,尤其是搜索翻转序列时会调用findAugmentSequence的方法,该方法采用递归的手段来搜索序列

import numpy as npclass AdjacencyMatrixHungarian:def __init__(self, costEdges):self.costEdges=costEdgesif costEdges.shape[0]!=costEdges.shape[1]:self.handleUnbalance()self.n=self.costEdges.shape[0]def handleUnbalance(self):m=self.costEdges.shape[0]n=self.costEdges.shape[1]if(m<n):dummy_row=np.zeros((1,n))self.costEdges=np.vstack((self.costEdges,dummy_row))elif(m>n):dummy_column=np.zeros((m,1))self.costEdges=np.hstack((self.costEdges,dummy_column))def calculate(self):self.rowReduction()self.columnReduction()print("Cost matrix after row and column reduction")print(self.costEdges)(coverRows,coverColumns,independentZeros)=self.findMinCoverLines()n1=len(coverRows)+len(coverColumns)while n1!=self.n:print("Haven't reach optimal, start iterate reduction")self.costEdges=self.iterateReduction(coverRows,coverColumns)print("Cost matrix after iterate reduction")print(self.costEdges)(coverRows,coverColumns,independentZeros)=self.findMinCoverLines()n1=len(coverRows)+len(coverColumns)return independentZerosdef rowReduction(self):for i in range(self.n):minValue=self.costEdges[i].min()for j in range(self.n):self.costEdges[i,j]=self.costEdges[i,j]-minValuedef columnReduction(self):for j in range(self.n):minValue=self.costEdges[:,j].min()for i in range(self.n):self.costEdges[i,j]=self.costEdges[i,j]-minValuedef iterateReduction(self, coverRows, coverColumns):minValue=float('inf')for i in range(self.n):for j in range(self.n):if i in coverRows or j in coverColumns:continueif self.costEdges[i,j]<minValue:minValue=self.costEdges[i,j]for i in range(self.n):if i in coverRows:continuefor j in range(self.n):self.costEdges[i,j]-=minValuefor j in range(self.n):if j in coverColumns:for i in range(self.n):self.costEdges[i,j]+=minValuereturn self.costEdgesdef findMinCoverLines(self):starMarks=np.zeros((self.n,self.n))primeMarks=np.zeros((self.n,self.n))coverRows=[]coverColumns=[]for i in range(self.n):for j in range(self.n):if self.costEdges[i,j]!=0:continueexistStarZeroThisRow=Falsefor k in range(self.n):if starMarks[i,k]==1:existStarZeroThisRow=TruebreakexistStarZeroThisColumn=Falsefor l in range(self.n):if starMarks[l,j]==1:existStarZeroThisColumn=Truebreakif not existStarZeroThisRow and not existStarZeroThisColumn:starMarks[i,j]=1coverColumns.append(j)while True:uncoverZeroCounter=0for i in range(self.n):for j in range(self.n):if self.costEdges[i,j]!=0:continueif i in coverRows or j in coverColumns:continueuncoverZeroCounter+=1primeMarks[i,j]=1noStarZeroThisRow=Truefor k in range(self.n):if starMarks[i,k]==1:coverColumns.remove(k)coverRows.append(i)noStarZeroThisRow=Falsebreakif not noStarZeroThisRow:continueelse:sequence=self.findAugmentSequence(starMarks, primeMarks, [(i,j)])for k in range(len(sequence)):if k%2==0:primeMarks[sequence[k][0],sequence[k][1]]=0starMarks[sequence[k][0],sequence[k][1]]=1else:starMarks[sequence[k][0],sequence[k][1]]=0primeMarks[:,:]=0coverRows=[]coverColumns=[]for j in range(self.n):for i in range(self.n):if starMarks[i,j]==1:coverColumns.append(j)continueif uncoverZeroCounter==0:breakindependentZeros=[]for i in range(self.n):for j in range(self.n):if starMarks[i,j]==1:independentZeros.append((i,j))return (coverRows,coverColumns,independentZeros)def findAugmentSequence(self, starMarks, primeMarks, sequence):if primeMarks[sequence[-1][0],sequence[-1][1]]==1:for i in range(self.n):if starMarks[i,sequence[-1][1]]==1:sequence.append((i,sequence[-1][1]))return self.findAugmentSequence(starMarks,primeMarks,sequence)return sequenceelif starMarks[sequence[-1][0],sequence[-1][1]]==1:for j in range(self.n):if primeMarks[sequence[-1][0],j]==1:sequence.append((sequence[-1][0],j))return self.findAugmentSequence(starMarks,primeMarks,sequence)return sequence

用下面的代码进行测试:

import numpy as np
from AdjacencyMatrixHungarian import AdjacencyMatrixHungariancostEdges=np.array([[210, 90, 180, 160],[100, 70, 130, 200],[175, 105,140, 170],[80, 65, 105, 120]
])
algorithm=AdjacencyMatrixHungarian(costEdges)
result=algorithm.calculate()
print("Optimal result")
print(result)

结果:

更多推荐

求解分配问题(一) Kuhn

本文发布于:2024-02-17 04:13:10,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1692586.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分配   Kuhn

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!