[TRACE] org.apache.hadoop.hbase.regionserver.RegionMergeTransaction
在之前的文章中, 我們介紹了org.apache.hadoop.hbase.regionserver,
在org.apache.hadoop.hbase.regionserver這個函式中,
使用RegionMergeTransaction物件執行Region的合併:
RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
其中, a, b 是要被合併的Region.
rmt則是合併之後, 新展生的Region.
為了詳細了解Region在進行合併時的操作,
我們將繼續研讀RegionMergeTransaction的原始碼,
首先, 我們先來看RegionMergeTransaction的state machine:
在RegionMergeTransaction的過程中, 先把a, b兩個Region設成merging的狀態,
之後, 開啟一個新的Region作為合併後的Region,
關閉(close)並移除(offline)被合併的Region a和Region b,
最後再開啟合併後的Region.
在這裡, 我們直接跳掉RegionMergeTransaction檢查條件的部分,
(在perpare()中, 檢查是否屬於同一張表, 是否相鄰, Region狀態等...)
以下是執行的主程式:
在程式中, 先取得RegionServer的資訊, 呼叫createMergedRegion(),
在createMergedRegion()中, 則呼叫stepsBeforePONR(server, services, testing),
完成後, 則回到execute()中, 執行stepsAfterPONR(server, services, mergedRegion),
其中, PONR(Point of no return)就是上述state machine是最後的狀態.
經過PONR後, 就無法回到merge前的狀態,
在stepsBeforePONR中, 將先通知Master,
把要進行合併的Region a和Region b狀態改變 (READY_TO_MERGE),
之後, 在Region a所在的FileSystem中, 開啟merge的資料路徑,
關閉並且移除Region a和Region b, 並且合併他們的StoreFile,
設定reference files,
最後透過createMergedRegionFromMerges開啟合併後的Region,
在這裡, createMergedRegionFromMerges可以參考HRegion的介紹,
在完成stepsBeforePONR後, 則進入stepsAfterPONR,
這個函式只是簡單的把Region打開(open)並開啟運作(online),
通知Master, 把已經完成合併的Region a和Region b狀態改變 (mergedRegion),
參考資料:
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/HRegion.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html
在org.apache.hadoop.hbase.regionserver這個函式中,
使用RegionMergeTransaction物件執行Region的合併:
RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
其中, a, b 是要被合併的Region.
rmt則是合併之後, 新展生的Region.
為了詳細了解Region在進行合併時的操作,
我們將繼續研讀RegionMergeTransaction的原始碼,
首先, 我們先來看RegionMergeTransaction的state machine:
88 /**
89 * Types to add to the transaction journal. Each enum is a step in the merge
90 * transaction. Used to figure how much we need to rollback.
91 */
92 enum JournalEntry {
93 /**
94 * Set region as in transition, set it into MERGING state.
95 */
96 SET_MERGING,
97 /**
98 * We created the temporary merge data directory.
99 */
100 CREATED_MERGE_DIR,
101 /**
102 * Closed the merging region A.
103 */
104 CLOSED_REGION_A,
105 /**
106 * The merging region A has been taken out of the server's online regions list.
107 */
108 OFFLINED_REGION_A,
109 /**
110 * Closed the merging region B.
111 */
112 CLOSED_REGION_B,
113 /**
114 * The merging region B has been taken out of the server's online regions list.
115 */
116 OFFLINED_REGION_B,
117 /**
118 * Started in on creation of the merged region.
119 */
120 STARTED_MERGED_REGION_CREATION,
121 /**
122 * Point of no return. If we got here, then transaction is not recoverable
123 * other than by crashing out the regionserver.
124 */
125 PONR
126 }
在RegionMergeTransaction的過程中, 先把a, b兩個Region設成merging的狀態,
之後, 開啟一個新的Region作為合併後的Region,
關閉(close)並移除(offline)被合併的Region a和Region b,
最後再開啟合併後的Region.
在這裡, 我們直接跳掉RegionMergeTransaction檢查條件的部分,
(在perpare()中, 檢查是否屬於同一張表, 是否相鄰, Region狀態等...)
以下是執行的主程式:
214 /**
215 * Run the transaction.
216 * @param server Hosting server instance. Can be null when testing
217 * @param services Used to online/offline regions.
218 * @throws IOException If thrown, transaction failed. Call
219 * {@link #rollback(Server, RegionServerServices)}
220 * @return merged region
221 * @throws IOException
222 * @see #rollback(Server, RegionServerServices)
223 */
224 public HRegion execute(final Server server,
225 final RegionServerServices services) throws IOException {
226 if (rsCoprocessorHost == null) {
227 rsCoprocessorHost = server != null ?
228 ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
229 }
230 HRegion mergedRegion = createMergedRegion(server, services);
231 if (rsCoprocessorHost != null) {
232 rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
233 }
234 return stepsAfterPONR(server, services, mergedRegion);
235 }
在程式中, 先取得RegionServer的資訊, 呼叫createMergedRegion(),
在createMergedRegion()中, 則呼叫stepsBeforePONR(server, services, testing),
完成後, 則回到execute()中, 執行stepsAfterPONR(server, services, mergedRegion),
其中, PONR(Point of no return)就是上述state machine是最後的狀態.
經過PONR後, 就無法回到merge前的狀態,
246 /**
247 * Prepare the merged region and region files.
248 * @param server Hosting server instance. Can be null when testing
249 * @param services Used to online/offline regions.
250 * @return merged region
251 * @throws IOException If thrown, transaction failed. Call
252 * {@link #rollback(Server, RegionServerServices)}
253 */
254 HRegion createMergedRegion(final Server server,
255 final RegionServerServices services) throws IOException {
256 LOG.info("Starting merge of " + region_a + " and "
257 + region_b.getRegionNameAsString() + ", forcible=" + forcible);
258 if ((server != null && server.isStopped())
259 || (services != null && services.isStopping())) {
260 throw new IOException("Server is stopped or stopping");
261 }
262
263 if (rsCoprocessorHost != null) {
264 if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
265 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
266 + this.region_b + " merge.");
267 }
268 }
269
270 // If true, no cluster to write meta edits to or to use coordination.
271 boolean testing = server == null ? true : server.getConfiguration()
272 .getBoolean("hbase.testing.nocluster", false);
273
274 HRegion mergedRegion = stepsBeforePONR(server, services, testing);
275
276 @MetaMutationAnnotation
277 List<Mutation> metaEntries = new ArrayList<Mutation>();
278 if (rsCoprocessorHost != null) {
279 if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
280 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
281 + this.region_b + " merge.");
282 }
283 try {
284 for (Mutation p : metaEntries) {
285 HRegionInfo.parseRegionName(p.getRow());
286 }
287 } catch (IOException e) {
288 LOG.error("Row key of mutation from coprocessor is not parsable as region name."
289 + "Mutations from coprocessor should only be for hbase:meta table.", e);
290 throw e;
291 }
292 }
293
294 // This is the point of no return. Similar with SplitTransaction.
295 // IF we reach the PONR then subsequent failures need to crash out this
296 // regionserver
297 this.journal.add(JournalEntry.PONR);
298
299 // Add merged region and delete region_a and region_b
300 // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
301 // will determine whether the region is merged or not in case of failures.
302 // If it is successful, master will roll-forward, if not, master will
303 // rollback
304 if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
305 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
306 // Passed PONR, let SSH clean it up
307 throw new IOException("Failed to notify master that merge passed PONR: "
308 + region_a.getRegionInfo().getRegionNameAsString() + " and "
309 + region_b.getRegionInfo().getRegionNameAsString());
310 }
311 return mergedRegion;
312 }
在stepsBeforePONR中, 將先通知Master,
把要進行合併的Region a和Region b狀態改變 (READY_TO_MERGE),
之後, 在Region a所在的FileSystem中, 開啟merge的資料路徑,
關閉並且移除Region a和Region b, 並且合併他們的StoreFile,
設定reference files,
最後透過createMergedRegionFromMerges開啟合併後的Region,
在這裡, createMergedRegionFromMerges可以參考HRegion的介紹,
341 public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
342 boolean testing) throws IOException {
343 if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
344 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
345 throw new IOException("Failed to get ok from master to merge "
346 + region_a.getRegionInfo().getRegionNameAsString() + " and "
347 + region_b.getRegionInfo().getRegionNameAsString());
348 }
349 this.journal.add(JournalEntry.SET_MERGING);
350
351 this.region_a.getRegionFileSystem().createMergesDir();
352 this.journal.add(JournalEntry.CREATED_MERGE_DIR);
353
354 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
355 services, this.region_a, true, testing);
356 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
357 services, this.region_b, false, testing);
358
359 assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
360
361
362 //
363 // mergeStoreFiles creates merged region dirs under the region_a merges dir
364 // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
365 // clean this up.
366 mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
367
368 // Log to the journal that we are creating merged region. We could fail
369 // halfway through. If we do, we could have left
370 // stuff in fs that needs cleanup -- a storefile or two. Thats why we
371 // add entry to journal BEFORE rather than AFTER the change.
372 this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
373 HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
374 this.region_b, this.mergedRegionInfo);
375 return mergedRegion;
376 }
在完成stepsBeforePONR後, 則進入stepsAfterPONR,
這個函式只是簡單的把Region打開(open)並開啟運作(online),
通知Master, 把已經完成合併的Region a和Region b狀態改變 (mergedRegion),
237 public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
238 HRegion mergedRegion) throws IOException {
239 openMergedRegion(server, services, mergedRegion);
240 if (rsCoprocessorHost != null) {
241 rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
242 }
243 return mergedRegion;
244 }
參考資料:
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.html
https://hbase.apache.org/xref/org/apache/hadoop/hbase/regionserver/HRegion.html
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/HRegion.html
留言
張貼留言