[TRACE] org.apache.hadoop.hbase.util.HMerge

在之前介紹過hbase中的merge工具: org.apache.hadoop.hbase.util.Merge
由於org.apache.hadoop.hbase.util.Merge有許多限制,
尤其是cluster必須offline, 以致在實體環境中難以適用...

這一次介紹的是另外一個merge工具: org.apache.hadoop.hbase.util.HMerge
org.apache.hadoop.hbase.util.HMerge (HMerge)有兩種不同的模式:
對於一般table的online merge, 以及對於.META. table的offline merge.
簡單來說, HMerge提供了和auto split相反的功能,
HMerge會去檢查相鄰的Region, 如果Region過小, 則進行merge.
這樣的狀況發生在大量資料被delete的狀況下,
對於進行merge table的要求就是要先disable, 以免在進行merge時資料出錯.

以下是HMerge的敘述:

public static void merge(Configuration conf,
         FileSystem fs,
         byte[] tableName,
         boolean testMasterRunning)
                  throws java.io.IOException

Scans the table and merges two adjacent regions if they are small. This only happens when a lot of rows are deleted. When merging the META region, the HBase instance must be offline. When merging a normal table, the HBase instance must be online, but the table must be disabled.

Parameters:
conf - - configuration object for HBase
fs - - FileSystem where regions reside
tableName - - Table to be compacted
testMasterRunning - True if we are to verify master is down before running merge

Throws:
java.io.IOException

接著,讓我們來trace org.apache.hadoop.hbase.util.HMerge的程式:

    public static void merge(Configuration conf, FileSystem fs, final byte[] tableName,
                             final boolean testMasterRunning) throws IOException {
        boolean masterIsRunning = false;
        if (testMasterRunning) {
            HConnection connection = HConnectionManager.getConnection(conf);
            masterIsRunning = connection.isMasterRunning();
        }
        HConnectionManager.deleteConnection(conf, true);
        if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
            if (masterIsRunning) {
                throw new IllegalStateException("Can not compact META table if instance is on-line");
            }
            new OfflineMerger(conf, fs).process();
        } else {
            if (!masterIsRunning) {
                throw new IllegalStateException(
                    "HBase instance must be running to merge a normal table");
            }
            //      HBaseAdmin admin = new HBaseAdmin(conf);
            //      if (!admin.isTableDisabled(tableName)) {
            //        throw new TableNotDisabledException(tableName);
            //      }
            new OnlineMerger(conf, fs, tableName).process();
        }
    }

一開始, HMerge先確認Master是否開啟,
如果開啟, 則只能進行一般table的merge (OnlineMerger),
OnlineMerger先取得fs, tableName, hbase conf等資訊,

        OnlineMerger(Configuration conf, FileSystem fs, final byte[] tableName) throws IOException {
            super(conf, fs, tableName);
            this.tableName = tableName;
            this.table = new HTable(conf, HConstants.META_TABLE_NAME);
            this.metaScanner = table.getScanner(HConstants.CATALOG_FAMILY,
                HConstants.REGIONINFO_QUALIFIER);
            this.latestRegion = null;
        }

其中,被執行的OnlineMerger.process()繼承致Merger物件:

        void process() throws IOException {
            try {
                for (HRegionInfo[] regionsToMerge = next(); regionsToMerge != null; regionsToMerge = next()) {
                    if (!merge(regionsToMerge)) {
                        return;
                    }
                }
            } finally {
                try {
                    hlog.closeAndDelete();

                } catch (IOException e) {
                    LOG.error(e);
                }
            }
        }


在OnlineMerger中, next()指的是下一個table中的Region,
在這個函式中, 會讀取.META. table的數據,
並且比對所讀出來的.META.資訊, 是否屬於目標的table.
這個程式會不斷讀取Region的資訊, 直到讀到.META.的最後一列資訊,
取得Region的資訊後,進入merge()的函式中,

        private HRegionInfo nextRegion() throws IOException {
            try {
                HRegionInfo results = getMetaRow();

                return results;
            } catch (IOException e) {
                e = RemoteExceptionHandler.checkIOException(e);
                LOG.error("meta scanner error", e);
                metaScanner.close();
                throw e;
            }
        }

        /*
         * Check current row has a HRegionInfo.  Skip to next row if HRI is empty.
         * @return A Map of the row content else null if we are off the end.
         * @throws IOException
         */
        private HRegionInfo getMetaRow() throws IOException {

            Result currentRow = metaScanner.next();
            boolean found = false;
            HRegionInfo region = null;
            while (currentRow != null) {
                LOG.info("Row: <" + Bytes.toString(currentRow.getRow()) + ">");
                byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
                    HConstants.REGIONINFO_QUALIFIER);

                if (regionInfoValue == null || regionInfoValue.length == 0) {

                    currentRow = metaScanner.next();
                    continue;
                } else {
                    region = Writables.getHRegionInfo(regionInfoValue);
                    if (!Bytes.equals(region.getTableDesc().getName(), this.tableName)) {
                        currentRow = metaScanner.next();
                        continue;
                    }
                }

                found = true;
                break;
            }
            return found ? region : null;
        }

        @Override
        protected HRegionInfo[] next() throws IOException {
            List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
            if (latestRegion == null) {
                latestRegion = nextRegion();
            }
            if (latestRegion != null) {
                regions.add(latestRegion);
            }
            latestRegion = nextRegion();
            if (latestRegion != null) {
                regions.add(latestRegion);
            }
            return regions.toArray(new HRegionInfo[regions.size()]);
        }

在這裡, 所回傳的Region資訊會以array的形式進入merge()中,
包含了目標table中所有的Region (藍色字體部分),
在merge()中將先判斷Region的個數, 若少於兩個則不動作,
之後, 取兩個相鄰的Region, 若兩個Region大小相加小於maxFilesize/2,
則進行merge, 其中, maxFilesize是啟動auto splitting的門檻數值,

  protected boolean merge(final HRegionInfo[] info) throws IOException {
            if (info.length < 2) {
                LOG.info("only one region - nothing to merge");
                return false;
            }

            HRegion currentRegion = null;
            long currentSize = 0;
            HRegion nextRegion = null;
            long nextSize = 0;
            for (int i = 0; i < info.length - 1; i++) {
                if (currentRegion == null) {
                    currentRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null);
                    currentRegion.initialize();
                    currentSize = currentRegion.getLargestHStoreSize();
                }
                nextRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null);
                nextRegion.initialize();
                nextSize = nextRegion.getLargestHStoreSize();

                if ((currentSize + nextSize) <= (maxFilesize / 2)) {
                    // We merge two adjacent regions if their total size is less than
                    // one half of the desired maximum size
                    LOG.info("Merging regions " + currentRegion.getRegionNameAsString() + " and "
                             + nextRegion.getRegionNameAsString());
                    HRegion mergedRegion = HRegion.mergeAdjacent(currentRegion, nextRegion);
                    updateMeta(currentRegion.getRegionName(), nextRegion.getRegionName(),
                        mergedRegion);
                    break;
                }
                LOG.info("not merging regions " + Bytes.toString(currentRegion.getRegionName())
                         + " and " + Bytes.toString(nextRegion.getRegionName()));
                currentRegion.close();
                currentRegion = nextRegion;
                currentSize = nextSize;
            }
            if (currentRegion != null) {
                currentRegion.close();
            }
            return true;
        }

進行merge時會先調用HRegion.mergeAdjacent(),
產生一個新的mergedRegion, 這一部分的程式可以參考關於HRegion(0.94)的文章,
之後, 再調用updateMeta(), 更新.META. table,
這裡就是把舊有的Region資訊從.META.中刪除(delete),
並把新的Region資訊(mergedRegion)加入(put).META. table中.

@Override
    protected void updateMeta(final byte [] oldRegion1,
        final byte [] oldRegion2,
      HRegion newRegion)
    throws IOException {
      byte[][] regionsToDelete = {oldRegion1, oldRegion2};
      for (int r = 0; r < regionsToDelete.length; r++) {
        if(Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
          latestRegion = null;
        }
        Delete delete = new Delete(regionsToDelete[r]);
        table.delete(delete);
        if(LOG.isDebugEnabled()) {
          LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r]));
        }
      }
      newRegion.getRegionInfo().setOffline(true);

      Put put = new Put(newRegion.getRegionName());
      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
        Writables.getBytes(newRegion.getRegionInfo()));
      table.put(put);

      if(LOG.isDebugEnabled()) {
        LOG.debug("updated columns in row: "
            + Bytes.toStringBinary(newRegion.getRegionName()));
      }
    }

在這一支程式中, 並沒有處理那些失去作用的Region,
然而, 我們若是對照splitting的流程,
失去作用的Region有可能是由Master進行Garbage collection程序清除,
不過, 我沒有查到相關資料, 可能要實驗觀察hbase的行為,
另外, HMerge要求要先disable目標的表格,
為了資料的安全起見, 最好先對表格中的資料進行複製,
對複製的表格進行merge, 並在確定資料一切正確後,
切換讀取複製表格, 刪除原始的表格, 以保持資料的連續性.


參考資料:
https://gist.github.com/bbeaudreault/7567385
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hbase/hbase/0.94.20/org/apache/hadoop/hbase/util/HMerge.java/
https://github.com/axfcampos/hbase-0.94.19/blob/master/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
http://www.cloudera.com/content/cloudera/en/documentation/shared/CDH5-Beta-2-RNs/hbase_jdiff_report-p-cdh4.5-c-cdh5b2/cdh4.5/org/apache/hadoop/hbase/util/HMerge.html
http://uestzengting.iteye.com/blog/1258826
https://issues.apache.org/jira/browse/HBASE-480

留言

熱門文章

LTE筆記: RSRP, RSSI and RSRQ

[WiFi] WiFi 網路的識別: BSS, ESS, SSID, ESSID, BSSID

LTE筆記: 波束成型 (beamforming) 和天線陣列