[TRACE] org.apache.hadoop.hbase.regionserver.HRegion (merge, 0.94)

上一篇trace HRegion的文章是0.96以後的版本,
因此, 在進行merge時, 有較為完整的程序,
在0.94版的hbase中, 關於merge的函示共有兩個:
mergeAdjacent()和merge(),
事實上, mergeAdjacent()也是經過判斷後呼叫merge()函式,
因此, 我們在這篇文章中專注研讀merge()函式,
以下是merge()的全部程式碼:

4630   /**
4631    * Merge two regions whether they are adjacent or not.
4632    *
4633    * @param a region a
4634    * @param b region b
4635    * @return new merged region
4636    * @throws IOException
4637    */
4638   public static HRegion merge(HRegion a, HRegion b)
4639   throws IOException {
4640     if (!a.getRegionInfo().getTableNameAsString().equals(
4641         b.getRegionInfo().getTableNameAsString())) {
4642       throw new IOException("Regions do not belong to the same table");
4643     }
4644 
4645     FileSystem fs = a.getFilesystem();
4646 
4647     // Make sure each region's cache is empty
4648 
4649     a.flushcache();
4650     b.flushcache();
4651 
4652     // Compact each region so we only have one store file per family
4653 
4654     a.compactStores(true);
4655     if (LOG.isDebugEnabled()) {
4656       LOG.debug("Files for region: " + a);
4657       listPaths(fs, a.getRegionDir());
4658     }
4659     b.compactStores(true);
4660     if (LOG.isDebugEnabled()) {
4661       LOG.debug("Files for region: " + b);
4662       listPaths(fs, b.getRegionDir());
4663     }
4664 
4665     Configuration conf = a.getBaseConf();
4666     HTableDescriptor tabledesc = a.getTableDesc();
4667     HLog log = a.getLog();
4668     Path tableDir = a.getTableDir();
4669     // Presume both are of same region type -- i.e. both user or catalog
4670     // table regions.  This way can use comparator.
4671     final byte[] startKey =
4672       (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
4673            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4674        || b.comparator.matchingRows(b.getStartKey(), 0,
4675               b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
4676               HConstants.EMPTY_BYTE_ARRAY.length))
4677       ? HConstants.EMPTY_BYTE_ARRAY
4678       : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
4679              b.getStartKey(), 0, b.getStartKey().length) <= 0
4680          ? a.getStartKey()
4681          : b.getStartKey());
4682     final byte[] endKey =
4683       (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
4684            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4685        || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
4686               HConstants.EMPTY_BYTE_ARRAY, 0,
4687               HConstants.EMPTY_BYTE_ARRAY.length))
4688       ? HConstants.EMPTY_BYTE_ARRAY
4689       : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
4690              b.getEndKey(), 0, b.getEndKey().length) <= 0
4691          ? b.getEndKey()
4692          : a.getEndKey());
4693 
4694     HRegionInfo newRegionInfo =
4695         new HRegionInfo(tabledesc.getName(), startKey, endKey);
4696     LOG.info("Creating new region " + newRegionInfo.toString());
4697     String encodedName = newRegionInfo.getEncodedName();
4698     Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
4699     if(fs.exists(newRegionDir)) {
4700       throw new IOException("Cannot merge; target file collision at " +
4701           newRegionDir);
4702     }
4703     HBaseFileSystem.makeDirOnFileSystem(fs, newRegionDir);
4704 
4705     LOG.info("starting merge of regions: " + a + " and " + b +
4706       " into new region " + newRegionInfo.toString() +
4707         " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
4708         Bytes.toStringBinary(endKey) + ">");
4709 
4710     // Move HStoreFiles under new region directory
4711     Map<byte [], List<StoreFile>> byFamily =
4712       new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
4713     byFamily = filesByFamily(byFamily, a.close());
4714     byFamily = filesByFamily(byFamily, b.close());
4715     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
4716       byte [] colFamily = es.getKey();
4717       makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
4718       // Because we compacted the source regions we should have no more than two
4719       // HStoreFiles per family and there will be no reference store
4720       List<StoreFile> srcFiles = es.getValue();
4721       for (StoreFile hsf: srcFiles) {
4722         StoreFile.rename(fs, hsf.getPath(),
4723           StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
4724             newRegionInfo.getEncodedName(), colFamily)));
4725       }
4726     }
4727     if (LOG.isDebugEnabled()) {
4728       LOG.debug("Files for new region");
4729       listPaths(fs, newRegionDir);
4730     }
4731     HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
4732         newRegionInfo, a.getTableDesc(), null);
4733     long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
4734     dstRegion.readRequestsCount.set(totalReadRequestCount);
4735     dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
4736     
4737     long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
4738     dstRegion.writeRequestsCount.set(totalWriteRequestCount);
4739     dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
4740     
4741     dstRegion.initialize();
4742     dstRegion.compactStores();
4743     if (LOG.isDebugEnabled()) {
4744       LOG.debug("Files for new region");
4745       listPaths(fs, dstRegion.getRegionDir());
4746     }
4747 
4748     // delete out the 'A' region
4749     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getConf()),
4750         a.getTableDir(), a.getRegionDir());
4751     // delete out the 'B' region
4752     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getConf()),
4753         b.getTableDir(), b.getRegionDir());
4754 
4755     LOG.info("merge completed. New region is " + dstRegion);
4756 
4757     return dstRegion;
4758   }


在merge()函式中, 一共兩個變數, 也就是要被合併的Region a和Region b,
在0.94版本中, 我們可以看到merge的操作比起之後的版本粗糙,
缺乏Region狀態的轉換, 也沒有資料的復原點,
把所有的程式合併在同一個物件中.

在程式的一開始, 先對Region a和Region b進行flush以及compact,
完成準備工作後, 假定Region a和Region b是同一種類型的Region (.META,或一般table),
產生merged Region的startkey和endkey, 並在Region a的位址產生merged Region的位址,
利用上述資訊, 產生HRegionInfo資料, 表示為以下的程式:

4647     // Make sure each region's cache is empty
4648 
4649     a.flushcache();
4650     b.flushcache();
4651 
4652     // Compact each region so we only have one store file per family
4653 
4654     a.compactStores(true);
4655     if (LOG.isDebugEnabled()) {
4656       LOG.debug("Files for region: " + a);
4657       listPaths(fs, a.getRegionDir());
4658     }
4659     b.compactStores(true);
4660     if (LOG.isDebugEnabled()) {
4661       LOG.debug("Files for region: " + b);
4662       listPaths(fs, b.getRegionDir());
4663     }
4664 
4665     Configuration conf = a.getBaseConf();
4666     HTableDescriptor tabledesc = a.getTableDesc();
4667     HLog log = a.getLog();
4668     Path tableDir = a.getTableDir();
4669     // Presume both are of same region type -- i.e. both user or catalog
4670     // table regions.  This way can use comparator.
4671     final byte[] startKey =
4672       (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
4673            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4674        || b.comparator.matchingRows(b.getStartKey(), 0,
4675               b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
4676               HConstants.EMPTY_BYTE_ARRAY.length))
4677       ? HConstants.EMPTY_BYTE_ARRAY
4678       : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
4679              b.getStartKey(), 0, b.getStartKey().length) <= 0
4680          ? a.getStartKey()
4681          : b.getStartKey());
4682     final byte[] endKey =
4683       (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
4684            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4685        || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
4686               HConstants.EMPTY_BYTE_ARRAY, 0,
4687               HConstants.EMPTY_BYTE_ARRAY.length))
4688       ? HConstants.EMPTY_BYTE_ARRAY
4689       : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
4690              b.getEndKey(), 0, b.getEndKey().length) <= 0
4691          ? b.getEndKey()
4692          : a.getEndKey());
4693 
4694     HRegionInfo newRegionInfo =
4695         new HRegionInfo(tabledesc.getName(), startKey, endKey);
4696     LOG.info("Creating new region " + newRegionInfo.toString());
4697     String encodedName = newRegionInfo.getEncodedName();
4698     Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
4699     if(fs.exists(newRegionDir)) {
4700       throw new IOException("Cannot merge; target file collision at " +
4701           newRegionDir);
4702     }
4703     HBaseFileSystem.makeDirOnFileSystem(fs, newRegionDir);
4704 
4705     LOG.info("starting merge of regions: " + a + " and " + b +
4706       " into new region " + newRegionInfo.toString() +
4707         " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
4708         Bytes.toStringBinary(endKey) + ">");

到此為止, 已經有了新的Region資訊,
所有的Region資訊多數是以Region a為基礎,
接下來, 將把Region a和Region b的資訊複製到新的資料位址中,

4710     // Move HStoreFiles under new region directory
4711     Map<byte [], List<StoreFile>> byFamily =
4712       new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
4713     byFamily = filesByFamily(byFamily, a.close());
4714     byFamily = filesByFamily(byFamily, b.close());
4715     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
4716       byte [] colFamily = es.getKey();
4717       makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
4718       // Because we compacted the source regions we should have no more than two
4719       // HStoreFiles per family and there will be no reference store
4720       List<StoreFile> srcFiles = es.getValue();
4721       for (StoreFile hsf: srcFiles) {
4722         StoreFile.rename(fs, hsf.getPath(),
4723           StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
4724             newRegionInfo.getEncodedName(), colFamily)));
4725       }
4726     }
4727     if (LOG.isDebugEnabled()) {
4728       LOG.debug("Files for new region");
4729       listPaths(fs, newRegionDir);
4730     }

完成上述準備之後, 正式開啟一個新的HRegion物件(dstRegion),
並且把些參數(如: readRequestsCount, writeRequestsCount)繼承自Region a和Region b,
之後初始dstRegion,

4731     HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
4732         newRegionInfo, a.getTableDesc(), null);
4733     long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
4734     dstRegion.readRequestsCount.set(totalReadRequestCount);
4735     dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
4736     
4737     long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
4738     dstRegion.writeRequestsCount.set(totalWriteRequestCount);
4739     dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
4740     
4741     dstRegion.initialize();
4742     dstRegion.compactStores();
4743     if (LOG.isDebugEnabled()) {
4744       LOG.debug("Files for new region");
4745       listPaths(fs, dstRegion.getRegionDir());
4746     }

開啟了新的Region後, 則把原有的Region a和Region b關閉,
回報新的Region物件, 完成merge的過程.

4748     // delete out the 'A' region
4749     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getConf()),
4750         a.getTableDir(), a.getRegionDir());
4751     // delete out the 'B' region
4752     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getConf()),
4753         b.getTableDir(), b.getRegionDir());
4754 
4755     LOG.info("merge completed. New region is " + dstRegion);
4756 
4757     return dstRegion;

和之後的merge方法不同,
在0.94版本中, 沒有明確的把Region從RegionServer中移除(offline),
而只是對HFile歸檔, 不確定在歸檔後, Region a和Region b的物件是否會消失,
這一部分應該要更進一步測試...

參考資料:
http://hbase.apache.org/0.94/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html
http://hbase.apache.org/0.94/xref/org/apache/hadoop/hbase/regionserver/HRegion.html

留言

熱門文章

LTE筆記: RSRP, RSSI and RSRQ

[WiFi] WiFi 網路的識別: BSS, ESS, SSID, ESSID, BSSID

LTE筆記: 波束成型 (beamforming) 和天線陣列