HBase协处理器:checkAndPut导致(HBase co

编程入门 行业动态 更新时间:2024-10-28 02:30:22
HBase协处理器:checkAndPut导致<获取行锁定超时>(HBase co-processor: checkAndPut causing )

HBase版本:0.94.15-cdh4.7.0

我有一个非常简单的设置:

ttt与数据 带计数器的表计数器(增量字段) 用于ttt表的prePut公司资深

当在ttt中插入/更新行时,协处理器检查相同行的列d:k中是否存在值。 如果没有值,协处理器会增加计数器表中的计数器,并通过checkAndPut方法将其分配给d:k列。

代码如下:

@Override public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { HTable tableCounters = null; HTable tableTarget = null; try { Get existingEdwGet = new Get(put.getRow()); existingEdwGet.addColumn("d".getBytes(), "k".getBytes()); tableTarget = new HTable( this.configuration, observerContext.getEnvironment().getRegion().getTableDesc().getName()); if (!tableTarget.exists(existingEdwGet)) { // increment the counter tableCounters = new HTable(this.configuration, "counters"); long newEdwKey = tableCounters.incrementColumnValue("static_row".getBytes(), "counters".getBytes(), "k".getBytes(), 1); Put keySetter = new Put(put.getRow()); keySetter.add("d".getBytes(), "k".getBytes(), Bytes.toBytes(newEdwKey)); tableTarget.checkAndPut(put.getRow(), "d".getBytes(), "k".getBytes(), null, keySetter); } } finally { releaseCloseable(tableTarget); releaseCloseable(tableCounters); } }

功利主义功能/变量:

releaseClosable - 使用try/catch简单的.close() this.configuration - 在协处理器启动期间获得的Hadoop配置

从hbase shell执行简单的PUT时:

for i in 0..10 do put 'ttt', "hrow-#{i}" , 'd:column', 'value' end

该地区报告死锁:

2015-07-02 23:58:30,297 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer (IPC Server handler 43 on 60020): java.io.IOException: Timed out on getting lock for row=hrow-1 at org.apache.hadoop.hbase.regionserver.HRegion.internalObtainRowLock(HRegion.java:3588) at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3678) at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3662) at org.apache.hadoop.hbase.regionserver.HRegion.checkAndMutate(HRegion.java:2723) at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndMutate(HRegionServer.java:2307) at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndPut(HRegionServer.java:2345) at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:354) at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1434)

问题:

checkAndPut是否允许从prePut协处理器执行? 还有什么办法可以保证在并发环境中,多个并发工作者可以写入同一个ttt行, d:k值只分配一次?

HBase version: 0.94.15-cdh4.7.0

I have a very simple setup:

table ttt with data table counters with a counter (increment field) prePut corpocessor for the ttt table

When a row is being inserted/updated in the ttt, the coprocessor checks whether a value is present in the column d:k for the same row. Should there be no value, coprocessor increments the counter in counters table and assigns it to the d:k column via checkAndPut method.

Code is as following:

@Override public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { HTable tableCounters = null; HTable tableTarget = null; try { Get existingEdwGet = new Get(put.getRow()); existingEdwGet.addColumn("d".getBytes(), "k".getBytes()); tableTarget = new HTable( this.configuration, observerContext.getEnvironment().getRegion().getTableDesc().getName()); if (!tableTarget.exists(existingEdwGet)) { // increment the counter tableCounters = new HTable(this.configuration, "counters"); long newEdwKey = tableCounters.incrementColumnValue("static_row".getBytes(), "counters".getBytes(), "k".getBytes(), 1); Put keySetter = new Put(put.getRow()); keySetter.add("d".getBytes(), "k".getBytes(), Bytes.toBytes(newEdwKey)); tableTarget.checkAndPut(put.getRow(), "d".getBytes(), "k".getBytes(), null, keySetter); } } finally { releaseCloseable(tableTarget); releaseCloseable(tableCounters); } }

Utilitarian functions/variables:

releaseClosable - simple .close() with try/catch this.configuration - Hadoop Configuration obtained during coprocessor start

While executing simple PUT from the hbase shell:

for i in 0..10 do put 'ttt', "hrow-#{i}" , 'd:column', 'value' end

the region reports a dead-lock:

2015-07-02 23:58:30,297 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer (IPC Server handler 43 on 60020): java.io.IOException: Timed out on getting lock for row=hrow-1 at org.apache.hadoop.hbase.regionserver.HRegion.internalObtainRowLock(HRegion.java:3588) at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3678) at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3662) at org.apache.hadoop.hbase.regionserver.HRegion.checkAndMutate(HRegion.java:2723) at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndMutate(HRegionServer.java:2307) at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndPut(HRegionServer.java:2345) at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:354) at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1434)

Questions:

are checkAndPut allowed to be executed from the prePut coprocessor? what else could be done to guarantee that in the concurrent environment, where multiple concurrent workers could be writing to the same ttt row, d:k value is assigned only once?

最满意答案

实际问题是一个无限循环,由prePut协处理器调用.put.checkAndPut ,它依次调用prePut协处理器。

为了打破循环,我实现了以下方法:

在正在创建的货币中添加标记 在协处理器的顶部 - 检查标记是否存在。 如果是,则删除标记并跳过协处理器。 如果否,则这是一个新请求,不是由此协处理器先前启动的; 因此 - 继续流动

public static final byte[] DIM_FAMILY = "d".getBytes(); public static final byte[] COLUMN_KEY = "k".getBytes(); public static final byte[] COLUMN_MARKER = "marker".getBytes(); public static final byte[] VALUE_MARKER = "+".getBytes(); public static final TableName TABLE_COUNTERS = TableName.valueOf("counters"); public static final byte[] COUNTER_FAMILY = "c".getBytes(); public static final byte[] COUNTER_ROWKEY = "rowkey_counter".getBytes(); public static final byte[] COUNTER_KEY = "key_counter".getBytes(); public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Put put, final WALEdit edit, final Durability durability) throws IOException { if (put.has(DIM_FAMILY, COLUMN_MARKER)) { removeColumnMutations(put, COLUMN_MARKER); return; // return from the coprocessor; otherwise an infinite loop will occur } HRegion region = observerContext.getEnvironment().getRegion(); Table tableCounters = null; Connection connectionCounters = null; try { // check whether the key column for the row is empty Get existingEdwGet = new Get(put.getRow()); existingEdwGet.addColumn(DIM_FAMILY, COLUMN_KEY); List<Cell> existingEdwCells = region.get(existingEdwGet, false); // check if key value is empty. // if so - assign one immediately if (existingEdwCells.isEmpty()) { // increment the key_counter connectionCounters = ConnectionFactory.createConnection(configuration); tableCounters = connectionCounters.getTable(TABLE_COUNTERS); long newEdwKey = tableCounters.incrementColumnValue(COUNTER_ROWKEY, COUNTER_FAMILY, COUNTER_KEY, 1); // form PUT with the new key value and a marker, showing that this insert should not be discarded Put keySetter = new Put(put.getRow()); keySetter.addColumn(DIM_FAMILY, COLUMN_KEY, Bytes.toBytes(newEdwKey)); keySetter.addColumn(DIM_FAMILY, COLUMN_MARKER, VALUE_MARKER); // consider checkAndPut return value, and increment Sequence Hole Number if needed boolean isNew = region.checkAndMutate(keySetter.getRow(), DIM_FAMILY, COLUMN_KEY, CompareFilter.CompareOp.EQUAL, new BinaryComparator(null), keySetter, true); } } finally { releaseCloseable(tableCounters); releaseCloseable(connectionCounters); } }

笔记:

上面的协处理器适合HBase 1.0 SDK 不是打开与底层区域的连接,而是使用RegionCoprocessorEnvironment上下文中的HBase Region实例 实用方法removeColumnMutations可以省略,它的唯一目的是从PUT中删除标记

The actual issue was an infinite loop, caused by prePut coprocessor calling a .put or .checkAndPut which was calling prePut coprocessor in its turn.

To break the loop, I have implemented following approach:

add a marker to the put being created at the top of the coprocessor - check if the marker is present. If yes then remove the marker and skip the coprocessor. If no then this is a new request, not initiated earlier by this coprocessor; thus - continue with the flow

.

public static final byte[] DIM_FAMILY = "d".getBytes(); public static final byte[] COLUMN_KEY = "k".getBytes(); public static final byte[] COLUMN_MARKER = "marker".getBytes(); public static final byte[] VALUE_MARKER = "+".getBytes(); public static final TableName TABLE_COUNTERS = TableName.valueOf("counters"); public static final byte[] COUNTER_FAMILY = "c".getBytes(); public static final byte[] COUNTER_ROWKEY = "rowkey_counter".getBytes(); public static final byte[] COUNTER_KEY = "key_counter".getBytes(); public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Put put, final WALEdit edit, final Durability durability) throws IOException { if (put.has(DIM_FAMILY, COLUMN_MARKER)) { removeColumnMutations(put, COLUMN_MARKER); return; // return from the coprocessor; otherwise an infinite loop will occur } HRegion region = observerContext.getEnvironment().getRegion(); Table tableCounters = null; Connection connectionCounters = null; try { // check whether the key column for the row is empty Get existingEdwGet = new Get(put.getRow()); existingEdwGet.addColumn(DIM_FAMILY, COLUMN_KEY); List<Cell> existingEdwCells = region.get(existingEdwGet, false); // check if key value is empty. // if so - assign one immediately if (existingEdwCells.isEmpty()) { // increment the key_counter connectionCounters = ConnectionFactory.createConnection(configuration); tableCounters = connectionCounters.getTable(TABLE_COUNTERS); long newEdwKey = tableCounters.incrementColumnValue(COUNTER_ROWKEY, COUNTER_FAMILY, COUNTER_KEY, 1); // form PUT with the new key value and a marker, showing that this insert should not be discarded Put keySetter = new Put(put.getRow()); keySetter.addColumn(DIM_FAMILY, COLUMN_KEY, Bytes.toBytes(newEdwKey)); keySetter.addColumn(DIM_FAMILY, COLUMN_MARKER, VALUE_MARKER); // consider checkAndPut return value, and increment Sequence Hole Number if needed boolean isNew = region.checkAndMutate(keySetter.getRow(), DIM_FAMILY, COLUMN_KEY, CompareFilter.CompareOp.EQUAL, new BinaryComparator(null), keySetter, true); } } finally { releaseCloseable(tableCounters); releaseCloseable(connectionCounters); } }

NOTES:

Above coprocessor fits HBase 1.0 SDK Instead of opening connection to the underlying region, a HBase Region instance is used from the RegionCoprocessorEnvironment context utilitarian method removeColumnMutations can be omitted, and its only purpose it removing marker from the PUT

更多推荐

本文发布于:2023-08-03 07:58:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1385316.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:处理器   HBase   checkAndPut

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!