[TRACE] org.apache.hadoop.hbase.regionserver.HRegion (merge, 0.94)
上一篇trace HRegion的文章是0.96以後的版本,
因此, 在進行merge時, 有較為完整的程序,
在0.94版的hbase中, 關於merge的函示共有兩個:
mergeAdjacent()和merge(),
事實上, mergeAdjacent()也是經過判斷後呼叫merge()函式,
因此, 我們在這篇文章中專注研讀merge()函式,
以下是merge()的全部程式碼:
在merge()函式中, 一共兩個變數, 也就是要被合併的Region a和Region b,
在0.94版本中, 我們可以看到merge的操作比起之後的版本粗糙,
缺乏Region狀態的轉換, 也沒有資料的復原點,
把所有的程式合併在同一個物件中.
在程式的一開始, 先對Region a和Region b進行flush以及compact,
完成準備工作後, 假定Region a和Region b是同一種類型的Region (.META,或一般table),
產生merged Region的startkey和endkey, 並在Region a的位址產生merged Region的位址,
利用上述資訊, 產生HRegionInfo資料, 表示為以下的程式:
到此為止, 已經有了新的Region資訊,
所有的Region資訊多數是以Region a為基礎,
接下來, 將把Region a和Region b的資訊複製到新的資料位址中,
完成上述準備之後, 正式開啟一個新的HRegion物件(dstRegion),
並且把些參數(如: readRequestsCount, writeRequestsCount)繼承自Region a和Region b,
之後初始dstRegion,
開啟了新的Region後, 則把原有的Region a和Region b關閉,
回報新的Region物件, 完成merge的過程.
和之後的merge方法不同,
在0.94版本中, 沒有明確的把Region從RegionServer中移除(offline),
而只是對HFile歸檔, 不確定在歸檔後, Region a和Region b的物件是否會消失,
這一部分應該要更進一步測試...
參考資料:
http://hbase.apache.org/0.94/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html
http://hbase.apache.org/0.94/xref/org/apache/hadoop/hbase/regionserver/HRegion.html
因此, 在進行merge時, 有較為完整的程序,
在0.94版的hbase中, 關於merge的函示共有兩個:
mergeAdjacent()和merge(),
事實上, mergeAdjacent()也是經過判斷後呼叫merge()函式,
因此, 我們在這篇文章中專注研讀merge()函式,
以下是merge()的全部程式碼:
4630 /**
4631 * Merge two regions whether they are adjacent or not.
4632 *
4633 * @param a region a
4634 * @param b region b
4635 * @return new merged region
4636 * @throws IOException
4637 */
4638 public static HRegion merge(HRegion a, HRegion b)
4639 throws IOException {
4640 if (!a.getRegionInfo().getTableNameAsString().equals(
4641 b.getRegionInfo().getTableNameAsString())) {
4642 throw new IOException("Regions do not belong to the same table");
4643 }
4644
4645 FileSystem fs = a.getFilesystem();
4646
4647 // Make sure each region's cache is empty
4648
4649 a.flushcache();
4650 b.flushcache();
4651
4652 // Compact each region so we only have one store file per family
4653
4654 a.compactStores(true);
4655 if (LOG.isDebugEnabled()) {
4656 LOG.debug("Files for region: " + a);
4657 listPaths(fs, a.getRegionDir());
4658 }
4659 b.compactStores(true);
4660 if (LOG.isDebugEnabled()) {
4661 LOG.debug("Files for region: " + b);
4662 listPaths(fs, b.getRegionDir());
4663 }
4664
4665 Configuration conf = a.getBaseConf();
4666 HTableDescriptor tabledesc = a.getTableDesc();
4667 HLog log = a.getLog();
4668 Path tableDir = a.getTableDir();
4669 // Presume both are of same region type -- i.e. both user or catalog
4670 // table regions. This way can use comparator.
4671 final byte[] startKey =
4672 (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
4673 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4674 || b.comparator.matchingRows(b.getStartKey(), 0,
4675 b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
4676 HConstants.EMPTY_BYTE_ARRAY.length))
4677 ? HConstants.EMPTY_BYTE_ARRAY
4678 : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
4679 b.getStartKey(), 0, b.getStartKey().length) <= 0
4680 ? a.getStartKey()
4681 : b.getStartKey());
4682 final byte[] endKey =
4683 (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
4684 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4685 || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
4686 HConstants.EMPTY_BYTE_ARRAY, 0,
4687 HConstants.EMPTY_BYTE_ARRAY.length))
4688 ? HConstants.EMPTY_BYTE_ARRAY
4689 : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
4690 b.getEndKey(), 0, b.getEndKey().length) <= 0
4691 ? b.getEndKey()
4692 : a.getEndKey());
4693
4694 HRegionInfo newRegionInfo =
4695 new HRegionInfo(tabledesc.getName(), startKey, endKey);
4696 LOG.info("Creating new region " + newRegionInfo.toString());
4697 String encodedName = newRegionInfo.getEncodedName();
4698 Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
4699 if(fs.exists(newRegionDir)) {
4700 throw new IOException("Cannot merge; target file collision at " +
4701 newRegionDir);
4702 }
4703 HBaseFileSystem.makeDirOnFileSystem(fs, newRegionDir);
4704
4705 LOG.info("starting merge of regions: " + a + " and " + b +
4706 " into new region " + newRegionInfo.toString() +
4707 " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
4708 Bytes.toStringBinary(endKey) + ">");
4709
4710 // Move HStoreFiles under new region directory
4711 Map<byte [], List<StoreFile>> byFamily =
4712 new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
4713 byFamily = filesByFamily(byFamily, a.close());
4714 byFamily = filesByFamily(byFamily, b.close());
4715 for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
4716 byte [] colFamily = es.getKey();
4717 makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
4718 // Because we compacted the source regions we should have no more than two
4719 // HStoreFiles per family and there will be no reference store
4720 List<StoreFile> srcFiles = es.getValue();
4721 for (StoreFile hsf: srcFiles) {
4722 StoreFile.rename(fs, hsf.getPath(),
4723 StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
4724 newRegionInfo.getEncodedName(), colFamily)));
4725 }
4726 }
4727 if (LOG.isDebugEnabled()) {
4728 LOG.debug("Files for new region");
4729 listPaths(fs, newRegionDir);
4730 }
4731 HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
4732 newRegionInfo, a.getTableDesc(), null);
4733 long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
4734 dstRegion.readRequestsCount.set(totalReadRequestCount);
4735 dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
4736
4737 long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
4738 dstRegion.writeRequestsCount.set(totalWriteRequestCount);
4739 dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
4740
4741 dstRegion.initialize();
4742 dstRegion.compactStores();
4743 if (LOG.isDebugEnabled()) {
4744 LOG.debug("Files for new region");
4745 listPaths(fs, dstRegion.getRegionDir());
4746 }
4747
4748 // delete out the 'A' region
4749 HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getConf()),
4750 a.getTableDir(), a.getRegionDir());
4751 // delete out the 'B' region
4752 HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getConf()),
4753 b.getTableDir(), b.getRegionDir());
4754
4755 LOG.info("merge completed. New region is " + dstRegion);
4756
4757 return dstRegion;
4758 }
在merge()函式中, 一共兩個變數, 也就是要被合併的Region a和Region b,
在0.94版本中, 我們可以看到merge的操作比起之後的版本粗糙,
缺乏Region狀態的轉換, 也沒有資料的復原點,
把所有的程式合併在同一個物件中.
在程式的一開始, 先對Region a和Region b進行flush以及compact,
完成準備工作後, 假定Region a和Region b是同一種類型的Region (.META,或一般table),
產生merged Region的startkey和endkey, 並在Region a的位址產生merged Region的位址,
利用上述資訊, 產生HRegionInfo資料, 表示為以下的程式:
4647 // Make sure each region's cache is empty
4648
4649 a.flushcache();
4650 b.flushcache();
4651
4652 // Compact each region so we only have one store file per family
4653
4654 a.compactStores(true);
4655 if (LOG.isDebugEnabled()) {
4656 LOG.debug("Files for region: " + a);
4657 listPaths(fs, a.getRegionDir());
4658 }
4659 b.compactStores(true);
4660 if (LOG.isDebugEnabled()) {
4661 LOG.debug("Files for region: " + b);
4662 listPaths(fs, b.getRegionDir());
4663 }
4664
4665 Configuration conf = a.getBaseConf();
4666 HTableDescriptor tabledesc = a.getTableDesc();
4667 HLog log = a.getLog();
4668 Path tableDir = a.getTableDir();
4669 // Presume both are of same region type -- i.e. both user or catalog
4670 // table regions. This way can use comparator.
4671 final byte[] startKey =
4672 (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
4673 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4674 || b.comparator.matchingRows(b.getStartKey(), 0,
4675 b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
4676 HConstants.EMPTY_BYTE_ARRAY.length))
4677 ? HConstants.EMPTY_BYTE_ARRAY
4678 : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
4679 b.getStartKey(), 0, b.getStartKey().length) <= 0
4680 ? a.getStartKey()
4681 : b.getStartKey());
4682 final byte[] endKey =
4683 (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
4684 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4685 || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
4686 HConstants.EMPTY_BYTE_ARRAY, 0,
4687 HConstants.EMPTY_BYTE_ARRAY.length))
4688 ? HConstants.EMPTY_BYTE_ARRAY
4689 : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
4690 b.getEndKey(), 0, b.getEndKey().length) <= 0
4691 ? b.getEndKey()
4692 : a.getEndKey());
4693
4694 HRegionInfo newRegionInfo =
4695 new HRegionInfo(tabledesc.getName(), startKey, endKey);
4696 LOG.info("Creating new region " + newRegionInfo.toString());
4697 String encodedName = newRegionInfo.getEncodedName();
4698 Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
4699 if(fs.exists(newRegionDir)) {
4700 throw new IOException("Cannot merge; target file collision at " +
4701 newRegionDir);
4702 }
4703 HBaseFileSystem.makeDirOnFileSystem(fs, newRegionDir);
4704
4705 LOG.info("starting merge of regions: " + a + " and " + b +
4706 " into new region " + newRegionInfo.toString() +
4707 " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
4708 Bytes.toStringBinary(endKey) + ">");
到此為止, 已經有了新的Region資訊,
所有的Region資訊多數是以Region a為基礎,
接下來, 將把Region a和Region b的資訊複製到新的資料位址中,
4710 // Move HStoreFiles under new region directory
4711 Map<byte [], List<StoreFile>> byFamily =
4712 new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
4713 byFamily = filesByFamily(byFamily, a.close());
4714 byFamily = filesByFamily(byFamily, b.close());
4715 for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
4716 byte [] colFamily = es.getKey();
4717 makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
4718 // Because we compacted the source regions we should have no more than two
4719 // HStoreFiles per family and there will be no reference store
4720 List<StoreFile> srcFiles = es.getValue();
4721 for (StoreFile hsf: srcFiles) {
4722 StoreFile.rename(fs, hsf.getPath(),
4723 StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
4724 newRegionInfo.getEncodedName(), colFamily)));
4725 }
4726 }
4727 if (LOG.isDebugEnabled()) {
4728 LOG.debug("Files for new region");
4729 listPaths(fs, newRegionDir);
4730 }
完成上述準備之後, 正式開啟一個新的HRegion物件(dstRegion),
並且把些參數(如: readRequestsCount, writeRequestsCount)繼承自Region a和Region b,
之後初始dstRegion,
4731 HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
4732 newRegionInfo, a.getTableDesc(), null);
4733 long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
4734 dstRegion.readRequestsCount.set(totalReadRequestCount);
4735 dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
4736
4737 long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
4738 dstRegion.writeRequestsCount.set(totalWriteRequestCount);
4739 dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
4740
4741 dstRegion.initialize();
4742 dstRegion.compactStores();
4743 if (LOG.isDebugEnabled()) {
4744 LOG.debug("Files for new region");
4745 listPaths(fs, dstRegion.getRegionDir());
4746 }
開啟了新的Region後, 則把原有的Region a和Region b關閉,
回報新的Region物件, 完成merge的過程.
4748 // delete out the 'A' region
4749 HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getConf()),
4750 a.getTableDir(), a.getRegionDir());
4751 // delete out the 'B' region
4752 HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getConf()),
4753 b.getTableDir(), b.getRegionDir());
4754
4755 LOG.info("merge completed. New region is " + dstRegion);
4756
4757 return dstRegion;
和之後的merge方法不同,
在0.94版本中, 沒有明確的把Region從RegionServer中移除(offline),
而只是對HFile歸檔, 不確定在歸檔後, Region a和Region b的物件是否會消失,
這一部分應該要更進一步測試...
參考資料:
http://hbase.apache.org/0.94/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html
http://hbase.apache.org/0.94/xref/org/apache/hadoop/hbase/regionserver/HRegion.html
留言
張貼留言