[TRACE] org.apache.hadoop.hbase.regionserver.RegionMergeTransaction

在之前的文章中, 我們介紹了org.apache.hadoop.hbase.regionserver,
在org.apache.hadoop.hbase.regionserver這個函式中,
使用RegionMergeTransaction物件執行Region的合併:

RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);

其中, a, b 是要被合併的Region.
rmt則是合併之後, 新展生的Region.
為了詳細了解Region在進行合併時的操作,
我們將繼續研讀RegionMergeTransaction的原始碼,

首先, 我們先來看RegionMergeTransaction的state machine:

88    /**
89     * Types to add to the transaction journal. Each enum is a step in the merge
90     * transaction. Used to figure how much we need to rollback.
91     */
92    enum JournalEntry {
93      /**
94       * Set region as in transition, set it into MERGING state.
95       */
96      SET_MERGING,
97      /**
98       * We created the temporary merge data directory.
99       */
100     CREATED_MERGE_DIR,
101     /**
102      * Closed the merging region A.
103      */
104     CLOSED_REGION_A,
105     /**
106      * The merging region A has been taken out of the server's online regions list.
107      */
108     OFFLINED_REGION_A,
109     /**
110      * Closed the merging region B.
111      */
112     CLOSED_REGION_B,
113     /**
114      * The merging region B has been taken out of the server's online regions list.
115      */
116     OFFLINED_REGION_B,
117     /**
118      * Started in on creation of the merged region.
119      */
120     STARTED_MERGED_REGION_CREATION,
121     /**
122      * Point of no return. If we got here, then transaction is not recoverable
123      * other than by crashing out the regionserver.
124      */
125     PONR
126   }

在RegionMergeTransaction的過程中, 先把a, b兩個Region設成merging的狀態,
之後, 開啟一個新的Region作為合併後的Region,
關閉(close)並移除(offline)被合併的Region a和Region b,
最後再開啟合併後的Region.

在這裡, 我們直接跳掉RegionMergeTransaction檢查條件的部分,
(在perpare()中, 檢查是否屬於同一張表, 是否相鄰, Region狀態等...)
以下是執行的主程式:

214   /**
215    * Run the transaction.
216    * @param server Hosting server instance. Can be null when testing
217    * @param services Used to online/offline regions.
218    * @throws IOException If thrown, transaction failed. Call
219    *           {@link #rollback(Server, RegionServerServices)}
220    * @return merged region
221    * @throws IOException
222    * @see #rollback(Server, RegionServerServices)
223    */
224   public HRegion execute(final Server server,
225  final RegionServerServices services) throws IOException {
226     if (rsCoprocessorHost == null) {
227       rsCoprocessorHost = server != null ?
228         ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
229     }
230     HRegion mergedRegion = createMergedRegion(server, services);
231     if (rsCoprocessorHost != null) {
232       rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
233     }
234     return stepsAfterPONR(server, services, mergedRegion);
235   }

在程式中, 先取得RegionServer的資訊, 呼叫createMergedRegion(),
在createMergedRegion()中, 則呼叫stepsBeforePONR(server, services, testing),
完成後, 則回到execute()中, 執行stepsAfterPONR(server, services, mergedRegion),
其中, PONR(Point of no return)就是上述state machine是最後的狀態.
經過PONR後, 就無法回到merge前的狀態,

246   /**
247    * Prepare the merged region and region files.
248    * @param server Hosting server instance. Can be null when testing
249    * @param services Used to online/offline regions.
250    * @return merged region
251    * @throws IOException If thrown, transaction failed. Call
252    *           {@link #rollback(Server, RegionServerServices)}
253    */
254   HRegion createMergedRegion(final Server server,
255       final RegionServerServices services) throws IOException {
256     LOG.info("Starting merge of " + region_a + " and "
257         + region_b.getRegionNameAsString() + ", forcible=" + forcible);
258     if ((server != null && server.isStopped())
259         || (services != null && services.isStopping())) {
260       throw new IOException("Server is stopped or stopping");
261     }
262 
263     if (rsCoprocessorHost != null) {
264       if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
265         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
266             + this.region_b + " merge.");
267       }
268     }
269 
270     // If true, no cluster to write meta edits to or to use coordination.
271     boolean testing = server == null ? true : server.getConfiguration()
272         .getBoolean("hbase.testing.nocluster", false);
273 
274     HRegion mergedRegion = stepsBeforePONR(server, services, testing);
275 
276     @MetaMutationAnnotation
277     List<Mutation> metaEntries = new ArrayList<Mutation>();
278     if (rsCoprocessorHost != null) {
279       if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
280         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
281             + this.region_b + " merge.");
282       }
283       try {
284         for (Mutation p : metaEntries) {
285           HRegionInfo.parseRegionName(p.getRow());
286         }
287       } catch (IOException e) {
288         LOG.error("Row key of mutation from coprocessor is not parsable as region name."
289             + "Mutations from coprocessor should only be for hbase:meta table.", e);
290         throw e;
291       }
292     }
293 
294     // This is the point of no return. Similar with SplitTransaction.
295     // IF we reach the PONR then subsequent failures need to crash out this
296     // regionserver
297     this.journal.add(JournalEntry.PONR);
298 
299     // Add merged region and delete region_a and region_b
300     // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
301     // will determine whether the region is merged or not in case of failures.
302     // If it is successful, master will roll-forward, if not, master will
303     // rollback
304     if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
305         mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
306       // Passed PONR, let SSH clean it up
307       throw new IOException("Failed to notify master that merge passed PONR: "
308         + region_a.getRegionInfo().getRegionNameAsString() + " and "
309         + region_b.getRegionInfo().getRegionNameAsString());
310     }
311     return mergedRegion;
312   }

在stepsBeforePONR中,  將先通知Master,
把要進行合併的Region a和Region b狀態改變 (READY_TO_MERGE),
之後, 在Region a所在的FileSystem中, 開啟merge的資料路徑,
關閉並且移除Region a和Region b, 並且合併他們的StoreFile,
設定reference files,
最後透過createMergedRegionFromMerges開啟合併後的Region,
在這裡, createMergedRegionFromMerges可以參考HRegion的介紹,

341   public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
342       boolean testing) throws IOException {
343     if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
344         mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
345       throw new IOException("Failed to get ok from master to merge "
346         + region_a.getRegionInfo().getRegionNameAsString() + " and "
347         + region_b.getRegionInfo().getRegionNameAsString());
348     }
349     this.journal.add(JournalEntry.SET_MERGING);
350 
351     this.region_a.getRegionFileSystem().createMergesDir();
352     this.journal.add(JournalEntry.CREATED_MERGE_DIR);
353 
354     Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
355         services, this.region_a, true, testing);
356     Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
357         services, this.region_b, false, testing);
358 
359     assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
360 
361 
362     //
363     // mergeStoreFiles creates merged region dirs under the region_a merges dir
364     // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
365     // clean this up.
366     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
367 
368     // Log to the journal that we are creating merged region. We could fail
369     // halfway through. If we do, we could have left
370     // stuff in fs that needs cleanup -- a storefile or two. Thats why we
371     // add entry to journal BEFORE rather than AFTER the change.
372     this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
373     HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
374         this.region_b, this.mergedRegionInfo);
375     return mergedRegion;
376   }

在完成stepsBeforePONR後, 則進入stepsAfterPONR,
這個函式只是簡單的把Region打開(open)並開啟運作(online),
通知Master, 把已經完成合併的Region a和Region b狀態改變 (mergedRegion),

237   public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
238       HRegion mergedRegion) throws IOException {
239     openMergedRegion(server, services, mergedRegion);
240     if (rsCoprocessorHost != null) {
241       rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
242     }
243     return mergedRegion;
244   }

參考資料:
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/HRegion.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html

留言

熱門文章

LTE筆記: RSRP, RSSI and RSRQ

[WiFi] WiFi 網路的識別: BSS, ESS, SSID, ESSID, BSSID

LTE筆記: 波束成型 (beamforming) 和天線陣列