我有两个RDD,第一个我将呼叫 userVisits ,如下所示:
I have two RDDs, the first I'll call userVisits that looks like this:
((123, someurl,Mon Nov 04 00:00:00 PST 2013),11.0)第二个是全部访问次数:
and the second is allVisits:
((someurl,Mon Nov 04 00:00:00 PST 2013),1122.0)我可以执行 userVisits.reduceByKey(_ + _)可以获取该用户的访问次数.我可以进行allVisits并获得相同的结果.我想做的是获得用户的加权平均值,除以用户访问次数除以当天的总访问次数.我需要使用用户访问中的部分键元组在allVisits中查找一个值.我猜可以用这样的地图来做到这一点:
I can do userVisits.reduceByKey(_+_) can get the number of visits by that user. I can do allVisits and get the same. What I want to do is get a weighted average for the users dividing the users visits by the total visits for the day. I need to lookup a value in allVisits with part of the key tuple in user visits. I'm guessing it could be done with a map like this:
userVisits.reduceByKey(_+_).map( item => item._2 / allVisits.get(item._1))我知道 allVisits.get(key)不存在,但是我该怎么做呢?
I know allVisits.get(key) doesn't exist, but how could I accomplish something like that?
另一种方法是从allVisits获取密钥,并从userVisits映射每个密钥数目,然后将两者结合,但这似乎效率很低.
The alternative is getting the keys from allVisits and mapping each number of keys from userVisits then joining the two, but that seems inefficient.
推荐答案我在这里看到的唯一通用选项是 join :
The only universal option I see here is join:
val userVisitsAgg = userVisits.reduceByKey(_ + _) val allVisitsAgg = allVisits.reduceByKey(_ + _) userVisitsAgg.map{case ((id, url, date), sum) => ((url, date), (id, sum))} .join(allVisitsAgg) .map{case ((url, date), ((id, userSum), (urlSum))) => ((id, url, date), userSum / urlSum)}如果 allVisitsAgg 足够小,可以广播,您可以在上面简化为类似以下内容:
If allVisitsAgg is small enough to be broadcasted you can simplify above to something like this:
val allVisitsAggBD = sc.broadcast(allVisitsAgg.collectAsMap) userVisitsAgg.map{case ((id, url, date), sum) => ((id, url), sum / allVisitsAggBD.value((url, date))) }更多推荐
Spark中的加权平均值
发布评论