redis源码解析(十一)hyperloglog算法源码分析

编程入门 行业动态 更新时间:2024-10-09 21:26:45

redis<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码解析(十一)hyperloglog算法源码分析"/>

redis源码解析(十一)hyperloglog算法源码分析

一. 简介

  HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的并且很小。详细关于hyperloglog算法实现原理可以参考此文,讲解的很细致。

二. 源码分析

/* 将稀疏存储改变为密集存储* Convert the HLL with sparse representation given as input in its dense* representation. Both representations are represented by SDS strings, and* the input representation is freed as a side effect.** The function returns C_OK if the sparse representation was valid,* otherwise C_ERR is returned if the representation was corrupted. */
int hllSparseToDense(robj *o) {sds sparse = o->ptr, dense;struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse;int idx = 0, runlen, regval;uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse);/* 已经是密集存储则直接返回OK* If the representation is already the right one return ASAP. */hdr = (struct hllhdr*) sparse;if (hdr->encoding == HLL_DENSE) return C_OK;/* 创建新的基数为0的dense* Create a string of the right size filled with zero bytes.* Note that the cached cardinality is set to 0 as a side effect* that is exactly the cardinality of an empty HLL. */dense = sdsnewlen(NULL,HLL_DENSE_SIZE);hdr = (struct hllhdr*) dense;*hdr = *oldhdr; /* This will copy the magic and cached cardinality. */hdr->encoding = HLL_DENSE;/* 读取稀疏存储并赋值给密集存储* Now read the sparse representation and set non-zero registers* accordingly. */p += HLL_HDR_SIZE;while(p < end) {if (HLL_SPARSE_IS_ZERO(p)) {runlen = HLL_SPARSE_ZERO_LEN(p);idx += runlen;p++;} else if (HLL_SPARSE_IS_XZERO(p)) {runlen = HLL_SPARSE_XZERO_LEN(p);idx += runlen;p += 2;} else {runlen = HLL_SPARSE_VAL_LEN(p);regval = HLL_SPARSE_VAL_VALUE(p);while(runlen--) {HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval);idx++;}p++;}}/* If the sparse representation was valid, we expect to find idx* set to HLL_REGISTERS. */if (idx != HLL_REGISTERS) {sdsfree(dense);return C_ERR;}/* Free the old representation and set the new one. */sdsfree(o->ptr);o->ptr = dense;return C_OK;
}/* 稀疏存储赋值,可能会改变稀疏存储转为密集存储* 还需要考虑ZERO, XZERO和VAL的变化* * Low level function to set the sparse HLL register at 'index' to the* specified value if the current value is smaller than 'count'.** The object 'o' is the String object holding the HLL. The function requires* a reference to the object in order to be able to enlarge the string if* needed.** On success, the function returns 1 if the cardinality changed, or 0* if the register for this element was not updated.* On error (if the representation is invalid) -1 is returned.** As a side effect the function may promote the HLL representation from* sparse to dense: this happens when a register requires to be set to a value* not representable with the sparse representation, or when the resulting* size would be greater than server.hll_sparse_max_bytes. */
int hllSparseSet(robj *o, long index, uint8_t count) {struct hllhdr *hdr;uint8_t oldcount, *sparse, *end, *p, *prev, *next;long first, span;long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0;/* 数据过多则改用密集存储* If the count is too big to be representable by the sparse representation* switch to dense representation. */if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote;/* 扩展空间* When updating a sparse representation, sometimes we may need to* enlarge the buffer for up to 3 bytes in the worst case (XZERO split* into XZERO-VAL-XZERO). Make sure there is enough space right now* so that the pointers we take during the execution of the function* will be valid all the time. */o->ptr = sdsMakeRoomFor(o->ptr,3);/* 第一步:检测是否需要修改桶* Step 1: we need to locate the opcode we need to modify to check* if a value update is actually needed. */sparse = p = ((uint8_t*)o->ptr) + HLL_HDR_SIZE;end = p + sdslen(o->ptr) - HLL_HDR_SIZE;first = 0;prev = NULL; /* Points to previos opcode at the end of the loop. */next = NULL; /* Points to the next opcode at the end of the loop. */span = 0;while(p < end) {long oplen;/* Set span to the number of registers covered by this opcode.** This is the most performance critical loop of the sparse* representation. Sorting the conditionals from the most to the* least frequent opcode in many-bytes sparse HLLs is faster. */oplen = 1;if (HLL_SPARSE_IS_ZERO(p)) {span = HLL_SPARSE_ZERO_LEN(p);} else if (HLL_SPARSE_IS_VAL(p)) {span = HLL_SPARSE_VAL_LEN(p);} else { /* XZERO. */span = HLL_SPARSE_XZERO_LEN(p);oplen = 2;}/* Break if this opcode covers the register as 'index'. */if (index <= first+span-1) break;prev = p;p += oplen;first += span;}if (span == 0) return -1; /* Invalid format. */next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;if (next >= end) next = NULL;/* 存储当前类型* Cache current opcode type to avoid using the macro again and* again for something that will not change.* Also cache the run-length of the opcode. */if (HLL_SPARSE_IS_ZERO(p)) {is_zero = 1;runlen = HLL_SPARSE_ZERO_LEN(p);} else if (HLL_SPARSE_IS_XZERO(p)) {is_xzero = 1;runlen = HLL_SPARSE_XZERO_LEN(p);} else {is_val = 1;runlen = HLL_SPARSE_VAL_LEN(p);}/* 第二步,根据不同情况选择** 1. len = 1的时候* 原值为VAL:小于现值则不变,大于则调用API修改* 原值为ZERO则直接替换为VAL*  * 2. len > 1的时候* 无论原值为哪种都需要调整为XZERO - VAL - XZERO的结合体** Step 2: After the loop:** 'first' stores to the index of the first register covered*  by the current opcode, which is pointed by 'p'.** 'next' ad 'prev' store respectively the next and previous opcode,*  or NULL if the opcode at 'p' is respectively the last or first.** 'span' is set to the number of registers covered by the current*  opcode.** There are different cases in order to update the data structure* in place without generating it from scratch:** A) If it is a VAL opcode already set to a value >= our 'count'*    no update is needed, regardless of the VAL run-length field.*    In this case PFADD returns 0 since no changes are performed.** B) If it is a VAL opcode with len = 1 (representing only our*    register) and the value is less than 'count', we just update it*    since this is a trivial case. */if (is_val) {oldcount = HLL_SPARSE_VAL_VALUE(p);/* Case A. */if (oldcount >= count) return 0;/* Case B. */if (runlen == 1) {HLL_SPARSE_VAL_SET(p,count,1);goto updated;}}/* 对于ZERO直接转化为VAL即可* C) Another trivial to handle case is a ZERO opcode with a len of 1.* We can just replace it with a VAL opcode with our value and len of 1. */if (is_zero && runlen == 1) {HLL_SPARSE_VAL_SET(p,count,1);goto updated;}/* 复杂情况:几种不同格式混杂时需要分割后进行处理*  * D) General case.** The other cases are more complex: our register requires to be updated* and is either currently represented by a VAL opcode with len > 1,* by a ZERO opcode with len > 1, or by an XZERO opcode.** In those cases the original opcode must be split into muliple* opcodes. The worst case is an XZERO split in the middle resuling into* XZERO - VAL - XZERO, so the resulting sequence max length is* 5 bytes.** We perform the split writing the new sequence into the 'new' buffer* with 'newlen' as length. Later the new sequence is inserted in place* of the old one, possibly moving what is on the right a few bytes* if the new sequence is longer than the older one. */uint8_t seq[5], *n = seq;int last = first+span-1; /* Last register covered by the sequence. */int len;if (is_zero || is_xzero) {/* Handle splitting of ZERO / XZERO. */if (index != first) {len = index-first;if (len > HLL_SPARSE_ZERO_MAX_LEN) {HLL_SPARSE_XZERO_SET(n,len);n += 2;} else {HLL_SPARSE_ZERO_SET(n,len);n++;}}HLL_SPARSE_VAL_SET(n,count,1);n++;if (index != last) {len = last-index;if (len > HLL_SPARSE_ZERO_MAX_LEN) {HLL_SPARSE_XZERO_SET(n,len);n += 2;} else {HLL_SPARSE_ZERO_SET(n,len);n++;}}} else {/* Handle splitting of VAL. */int curval = HLL_SPARSE_VAL_VALUE(p);if (index != first) {len = index-first;HLL_SPARSE_VAL_SET(n,curval,len);n++;}HLL_SPARSE_VAL_SET(n,count,1);n++;if (index != last) {len = last-index;HLL_SPARSE_VAL_SET(n,curval,len);n++;}}/* 第三步:替换旧的序列* Step 3: substitute the new sequence with the old one.** Note that we already allocated space on the sds string* calling sdsMakeRoomFor(). */int seqlen = n-seq;int oldlen = is_xzero ? 2 : 1;int deltalen = seqlen-oldlen;if (deltalen > 0 &&sdslen(o->ptr)+deltalen > server.hll_sparse_max_bytes) goto promote;if (deltalen && next) memmove(next+deltalen,next,end-next);sdsIncrLen(o->ptr,deltalen);memcpy(p,seq,seqlen);end += deltalen;updated:/* 第四步:优化。相邻的VAL有时候可以合并* Step 4: Merge adjacent values if possible.** The representation was updated, however the resulting representation* may not be optimal: adjacent VAL opcodes can sometimes be merged into* a single one. */p = prev ? prev : sparse;int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */while (p < end && scanlen--) {if (HLL_SPARSE_IS_XZERO(p)) {p += 2;continue;} else if (HLL_SPARSE_IS_ZERO(p)) {p++;continue;}/* We need two adjacent VAL opcodes to try a merge, having* the same value, and a len that fits the VAL opcode max len. */if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) {int v1 = HLL_SPARSE_VAL_VALUE(p);int v2 = HLL_SPARSE_VAL_VALUE(p+1);if (v1 == v2) {int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1);if (len <= HLL_SPARSE_VAL_MAX_LEN) {HLL_SPARSE_VAL_SET(p+1,v1,len);memmove(p,p+1,end-p);sdsIncrLen(o->ptr,-1);end--;/* After a merge we reiterate without incrementing 'p'* in order to try to merge the just merged value with* a value on its right. */continue;}}}p++;}/* Invalidate the cached cardinality. */hdr = o->ptr;HLL_INVALIDATE_CACHE(hdr);return 1;promote: /* Promote to dense representation. */if (hllSparseToDense(o) == C_ERR) return -1; /* Corrupted HLL. */hdr = o->ptr;/* 重新设置hllDense* We need to call hllDenseAdd() to perform the operation after the* conversion. However the result must be 1, since if we need to* convert from sparse to dense a register requires to be updated.** Note that this in turn means that PFADD will make sure the command* is propagated to slaves / AOF, so if there is a sparse -> dense* convertion, it will be performed in all the slaves as well. */int dense_retval = hllDenseSet(hdr->registers,index,count);serverAssert(dense_retval == 1);return dense_retval;
}/* 对hllSparseSet的封装函数* "Add" the element in the sparse hyperloglog data structure.* Actually nothing is added, but the max 0 pattern counter of the subset* the element belongs to is incremented if needed.** This function is actually a wrapper for hllSparseSet(), it only performs* the hashshing of the elmenet to obtain the index and zeros run length. */
int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) {long index;uint8_t count = hllPatLen(ele,elesize,&index);/* Update the register if this element produced a longer run of zeroes. */return hllSparseSet(o,index,count);
}/* 计算稀疏矩阵的和* Compute SUM(2^-reg) in the sparse representation.* PE is an array with a pre-computer table of values 2^-reg indexed by reg.* As a side effect the integer pointed by 'ezp' is set to the number* of zero registers. */
double hllSparseSum(uint8_t *sparse, int sparselen, double *PE, int *ezp, int *invalid) {double E = 0;int ez = 0, idx = 0, runlen, regval;uint8_t *end = sparse+sparselen, *p = sparse;while(p < end) {if (HLL_SPARSE_IS_ZERO(p)) {runlen = HLL_SPARSE_ZERO_LEN(p);idx += runlen;ez += runlen;/* Increment E at the end of the loop. */p++;} else if (HLL_SPARSE_IS_XZERO(p)) {runlen = HLL_SPARSE_XZERO_LEN(p);idx += runlen;ez += runlen;/* Increment E at the end of the loop. */p += 2;} else {runlen = HLL_SPARSE_VAL_LEN(p);regval = HLL_SPARSE_VAL_VALUE(p);idx += runlen;E += PE[regval]*runlen;p++;}}if (idx != HLL_REGISTERS && invalid) *invalid = 1;E += ez; /* Add 2^0 'ez' times. */*ezp = ez;return E;
}
/* ========================= HyperLogLog Count ==============================* hyperloglog近似统计算法* This is the core of the algorithm where the approximated count is computed.* The function uses the lower level hllDenseSum() and hllSparseSum() functions* as helpers to compute the SUM(2^-reg) part of the computation, which is* representation-specific, while all the rest is common. *//* 内部加速计算函数,由于计算小型求和* Implements the SUM operation for uint8_t data type which is only used* internally as speedup for PFCOUNT with multiple keys. */
double hllRawSum(uint8_t *registers, double *PE, int *ezp) {double E = 0;int j, ez = 0;uint64_t *word = (uint64_t*) registers;uint8_t *bytes;for (j = 0; j < HLL_REGISTERS/8; j++) {if (*word == 0) {ez += 8;} else {bytes = (uint8_t*) word;if (bytes[0]) E += PE[bytes[0]]; else ez++;if (bytes[1]) E += PE[bytes[1]]; else ez++;if (bytes[2]) E += PE[bytes[2]]; else ez++;if (bytes[3]) E += PE[bytes[3]]; else ez++;if (bytes[4]) E += PE[bytes[4]]; else ez++;if (bytes[5]) E += PE[bytes[5]]; else ez++;if (bytes[6]) E += PE[bytes[6]]; else ez++;if (bytes[7]) E += PE[bytes[7]]; else ez++;}word++;}E += ez; /* 2^(-reg[j]) is 1 when m is 0, add it 'ez' times for everyzero register in the HLL. */*ezp = ez;return E;
}/* 计算并返回近似基数* Return the approximated cardinality of the set based on the harmonic* mean of the registers values. 'hdr' points to the start of the SDS* representing the String object holding the HLL representation.** If the sparse representation of the HLL object is not valid, the integer* pointed by 'invalid' is set to non-zero, otherwise it is left untouched.** hllCount() supports a special internal-only encoding of HLL_RAW, that* is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.* This is useful in order to speedup PFCOUNT when called against multiple* keys (no need to work with 6-bit integers encoding). */
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {double m = HLL_REGISTERS;double E, alpha = 0.7213/(1+1.079/m);int j, ez; /* Number of registers equal to 0. *//* We precompute 2^(-reg[j]) in a small table in order to* speedup the computation of SUM(2^-register[0..i]). */static int initialized = 0;static double PE[64];if (!initialized) {PE[0] = 1; /* 2^(-reg[j]) is 1 when m is 0. */for (j = 1; j < 64; j++) {/* 2^(-reg[j]) is the same as 1/2^reg[j]. */PE[j] = 1.0/(1ULL << j);}initialized = 1;}/* Compute SUM(2^-register[0..i]). */if (hdr->encoding == HLL_DENSE) {E = hllDenseSum(hdr->registers,PE,&ez);} else if (hdr->encoding == HLL_SPARSE) {E = hllSparseSum(hdr->registers,sdslen((sds)hdr)-HLL_HDR_SIZE,PE,&ez,invalid);} else if (hdr->encoding == HLL_RAW) {E = hllRawSum(hdr->registers,PE,&ez);} else {serverPanic("Unknown HyperLogLog encoding in hllCount()");}/* Apply loglog-beta to the raw estimate. See:* "LogLog-Beta and More: A New Algorithm for Cardinality Estimation* Based on LogLog Counting" Jason Qin, Denys Kim, Yumei Tung* arXiv:1612.02284 */double zl = log(ez + 1);double beta = -0.370393911*ez +0.070471823*zl +0.17393686*pow(zl,2) +0.16339839*pow(zl,3) +-0.09237745*pow(zl,4) +0.03738027*pow(zl,5) +-0.005384159*pow(zl,6) +0.00042419*pow(zl,7);E  = llroundl(alpha*m*(m-ez)*(1/(E+beta)));return (uint64_t) E;
}/* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
int hllAdd(robj *o, unsigned char *ele, size_t elesize) {struct hllhdr *hdr = o->ptr;switch(hdr->encoding) {case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);default: return -1; /* Invalid representation. */}
}/* Merge by computing MAX(registers[i],hll[i]) the HyperLogLog 'hll'* with an array of uint8_t HLL_REGISTERS registers pointed by 'max'.** The hll object must be already validated via isHLLObjectOrReply()* or in some other way.** If the HyperLogLog is sparse and is found to be invalid, C_ERR* is returned, otherwise the function always succeeds. */
int hllMerge(uint8_t *max, robj *hll) {struct hllhdr *hdr = hll->ptr;int i;if (hdr->encoding == HLL_DENSE) {uint8_t val;for (i = 0; i < HLL_REGISTERS; i++) {HLL_DENSE_GET_REGISTER(val,hdr->registers,i);if (val > max[i]) max[i] = val;}} else {uint8_t *p = hll->ptr, *end = p + sdslen(hll->ptr);long runlen, regval;p += HLL_HDR_SIZE;i = 0;while(p < end) {if (HLL_SPARSE_IS_ZERO(p)) {runlen = HLL_SPARSE_ZERO_LEN(p);i += runlen;p++;} else if (HLL_SPARSE_IS_XZERO(p)) {runlen = HLL_SPARSE_XZERO_LEN(p);i += runlen;p += 2;} else {runlen = HLL_SPARSE_VAL_LEN(p);regval = HLL_SPARSE_VAL_VALUE(p);while(runlen--) {if (regval > max[i]) max[i] = regval;i++;}p++;}}if (i != HLL_REGISTERS) return C_ERR;}return C_OK;
}```

更多推荐

redis源码解析(十一)hyperloglog算法源码分析

本文发布于:2024-02-24 21:14:17,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1696672.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   算法   redis   hyperloglog

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!