Review Board 1.7.22


BOOKKEEPER-218: Provide journal manager to manage journal related operations

Review Request #4737 - Created April 16, 2012 and updated

Sijie Guo
BOOKKEEPER-218
Reviewers
bookkeeper
bookkeeper-git
Currently we put all journal related operations in Bookie class. It would be better to provide a journal manager to provide journal related operations. It would make Bookie logic more clearly.

Besides that, some admin tools like BOOKKEEPER-183 needs to provide could use JournalManager to read/check journal files directly.

 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Diff Revision 1 Diff Revision 3
[20] 36 lines
[+20]
37
import java.util.concurrent.atomic.AtomicBoolean;
37
import java.util.concurrent.atomic.AtomicBoolean;
38

    
   
38

   
39
import org.apache.bookkeeper.meta.LedgerManager;
39
import org.apache.bookkeeper.meta.LedgerManager;
40
import org.apache.bookkeeper.meta.LedgerManagerFactory;
40
import org.apache.bookkeeper.meta.LedgerManagerFactory;
41
import org.apache.bookkeeper.bookie.BookieException;
41
import org.apache.bookkeeper.bookie.BookieException;
42
import org.apache.bookkeeper.bookie.JournalManager.JournalScanner;
42
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
43
import org.apache.bookkeeper.conf.ServerConfiguration;
43
import org.apache.bookkeeper.conf.ServerConfiguration;
44
import org.apache.bookkeeper.jmx.BKMBeanInfo;
44
import org.apache.bookkeeper.jmx.BKMBeanInfo;
45
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
45
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
46
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
46
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
47
import org.slf4j.Logger;
47
import org.slf4j.Logger;
[+20] [20] 17 lines
[+20] [+] public class Bookie extends Thread {
65
    final File ledgerDirectories[];
65
    final File ledgerDirectories[];
66
    final ServerConfiguration conf;
66
    final ServerConfiguration conf;
67

    
   
67

   
68
    final SyncThread syncThread;
68
    final SyncThread syncThread;
69
    final LedgerManager ledgerManager;
69
    final LedgerManager ledgerManager;

    
   
70
    final LedgerStorage ledgerStorage;

    
   
71
    final Journal journal;
70
    final HandleFactory handles;
72
    final HandleFactory handles;
71

    
   
73

   
72
    static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
74
    static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
73

    
   
75

   
74
    // ZK registration path for this bookie
76
    // ZK registration path for this bookie
[+20] [20] 11 lines
[+20] public class Bookie extends Thread {
86

    
   
88

   
87
    private int exitCode = ExitCode.OK;
89
    private int exitCode = ExitCode.OK;
88

    
   
90

   
89
    // jmx related beans
91
    // jmx related beans
90
    BookieBean jmxBookieBean;
92
    BookieBean jmxBookieBean;
91
    LedgerCacheBean jmxLedgerCacheBean;
93
    BKMBeanInfo jmxLedgerStorageBean;
92

    
   
94

   
93
    Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long, byte[]>());
95
    Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long, byte[]>());
94

    
   
96

   
95
    public static class NoLedgerException extends IOException {
97
    public static class NoLedgerException extends IOException {
96
        private static final long serialVersionUID = 1L;
98
        private static final long serialVersionUID = 1L;
[+20] [20] 31 lines
[+20] [+] public void writeComplete(int rc, long ledgerId, long entryId,
128
            LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
130
            LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
129
                      new Object[] { entryId, ledgerId, addr, rc });
131
                      new Object[] { entryId, ledgerId, addr, rc });
130
        }
132
        }
131
    }
133
    }
132

    
   
134

   
133
    JournalManager journalManager;

   
134
    EntryLogger entryLogger;

   
135
    LedgerCache ledgerCache;

   
136
    // This is the thread that garbage collects the entry logs that do not

   
137
    // contain any active ledgers in them; and compacts the entry logs that

   
138
    // has lower remaining percentage to reclaim disk space.

   
139
    final GarbageCollectorThread gcThread;

   
140

    
   

   
141
    /**
135
    /**
142
     * SyncThread is a background thread which flushes ledger index pages periodically.
136
     * SyncThread is a background thread which flushes ledger index pages periodically.
143
     * Also it takes responsibility of garbage collecting journal files.
137
     * Also it takes responsibility of garbage collecting journal files.
144
     *
138
     *
145
     * <p>
139
     * <p>
[+20] [20] 31 lines
[+20] [+] class SyncThread extends Thread {
177
        public void run() {
171
        public void run() {
178
            while(running) {
172
            while(running) {
179
                synchronized(this) {
173
                synchronized(this) {
180
                    try {
174
                    try {
181
                        wait(flushInterval);
175
                        wait(flushInterval);
182
                        if (!entryLogger.testAndClearSomethingWritten()) {
176
                        if (!ledgerStorage.isFlushRequired()) {
183
                            continue;
177
                            continue;
184
                        }
178
                        }
185
                    } catch (InterruptedException e) {
179
                    } catch (InterruptedException e) {
186
                        Thread.currentThread().interrupt();
180
                        Thread.currentThread().interrupt();
187
                        continue;
181
                        continue;
[+20] [20] 8 lines
[+20] public void run() {
196
                    // set flushing flag failed, means flushing is true now
190
                    // set flushing flag failed, means flushing is true now
197
                    // indicates another thread wants to interrupt sync thread to exit
191
                    // indicates another thread wants to interrupt sync thread to exit
198
                    break;
192
                    break;
199
                }
193
                }
200

    
   
194

   
201
                // journal manager mark log
195
                // journal mark log
202
                journalManager.markLog();
196
                journal.markLog();
203

    
   
197

   
204
                boolean flushFailed = false;
198
                boolean flushFailed = false;
205
                try {
199
                try {
206
                    ledgerCache.flushLedger(true);
200
                    ledgerStorage.flush();
207
                } catch (IOException e) {
201
                } catch (IOException e) {
208
                    LOG.error("Exception flushing Ledger", e);
202
                    LOG.error("Exception flushing Ledger", e);
209
                    flushFailed = true;
203
                    flushFailed = true;
210
                }
204
                }
211
                try {

   
212
                    entryLogger.flush();

   
213
                } catch (IOException e) {

   
214
                    LOG.error("Exception flushing entry logger", e);

   
215
                    flushFailed = true;

   
216
                }

   
217

    
   
205

   
218
                // if flush failed, we should not roll last mark, otherwise we would
206
                // if flush failed, we should not roll last mark, otherwise we would
219
                // have some ledgers are not flushed and their journal entries were lost
207
                // have some ledgers are not flushed and their journal entries were lost
220
                if (!flushFailed) {
208
                if (!flushFailed) {
221
                    journalManager.rollLog();
209
                    journal.rollLog();
222
                    journalManager.gcJournals();
210
                    journal.gcJournals();
223
                }
211
                }
224

    
   
212

   
225
                // clear flushing flag
213
                // clear flushing flag
226
                flushing.set(false);
214
                flushing.set(false);
227
            }
215
            }
[+20] [20] 116 lines
[+20] [+] public static File[] getCurrentDirectories(File[] dirs) {
344
            currentDirs[i] = getCurrentDirectory(dirs[i]);
332
            currentDirs[i] = getCurrentDirectory(dirs[i]);
345
        }
333
        }
346
        return currentDirs;
334
        return currentDirs;
347
    }
335
    }
348

    
   
336

   
349
    /**

   
350
     * Scanner used to do entry log compaction

   
351
     */

   
352
    class EntryLogCompactionScanner implements EntryLogger.EntryLogScanner {

   
353
        @Override

   
354
        public boolean accept(long ledgerId) {

   
355
            // bookie has no knowledge about which ledger is deleted

   
356
            // so just accept all ledgers.

   
357
            return true;

   
358
        }

   
359

    
   

   
360
        @Override

   
361
        public void process(long ledgerId, ByteBuffer buffer)

   
362
            throws IOException {

   
363
            try {

   
364
                Bookie.this.addEntryByLedgerId(ledgerId, buffer);

   
365
            } catch (BookieException be) {

   
366
                throw new IOException(be);

   
367
            }

   
368
        }

   
369
    }

   
370

    
   
337

   
371
    public Bookie(ServerConfiguration conf)
338
    public Bookie(ServerConfiguration conf)
372
            throws IOException, KeeperException, InterruptedException, BookieException {
339
            throws IOException, KeeperException, InterruptedException, BookieException {
373
        super("Bookie-" + conf.getBookiePort());
340
        super("Bookie-" + conf.getBookiePort());
374
        this.conf = conf;
341
        this.conf = conf;
[+20] [20] 5 lines
[+20] public void process(long ledgerId, ByteBuffer buffer) public static File[] getCurrentDirectories(File[] dirs) {
380
        checkEnvironment(this.zk);
347
        checkEnvironment(this.zk);
381

    
   
348

   
382
        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
349
        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
383

    
   
350

   
384
        syncThread = new SyncThread(conf);
351
        syncThread = new SyncThread(conf);
385
        entryLogger = new EntryLogger(conf);
352
        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
386
        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
353
        handles = new HandleFactoryImpl(ledgerStorage);
387
        gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
354
        // instantiate the journal
388
                ledgerManager, new EntryLogCompactionScanner());
355
        journal = new Journal(conf);
389
        handles = new HandleFactoryImpl(entryLogger, ledgerCache);

   
390
        // instantiate the journal manager

   
391
        journalManager = new JournalManager(conf);

   
392

    
   
356

   
393
        // replay journals
357
        // replay journals
394
        readJournal();
358
        readJournal();
395
    }
359
    }
396

    
   
360

   
397
    private void readJournal() throws IOException, BookieException {
361
    private void readJournal() throws IOException, BookieException {
398
        journalManager.replayJournals(new JournalScanner() {
362
        journal.replay(new JournalScanner() {
399
            @Override
363
            @Override
400
            public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
364
            public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
401
                long ledgerId = recBuff.getLong();
365
                long ledgerId = recBuff.getLong();
402
                long entryId = recBuff.getLong();
366
                long entryId = recBuff.getLong();
403
                try {
367
                try {
[+20] [20] 11 lines
[+20] public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
415
                                    + ") is too old to hold this");
379
                                    + ") is too old to hold this");
416
                        }
380
                        }
417
                    } else {
381
                    } else {
418
                        byte[] key = masterKeyCache.get(ledgerId);
382
                        byte[] key = masterKeyCache.get(ledgerId);
419
                        if (key == null) {
383
                        if (key == null) {
420
                            key = ledgerCache.readMasterKey(ledgerId);
384
                            key = ledgerStorage.readMasterKey(ledgerId);
421
                        }
385
                        }
422
                        LedgerDescriptor handle = handles.getHandle(ledgerId, key);
386
                        LedgerDescriptor handle = handles.getHandle(ledgerId, key);
423

    
   
387

   
424
                        recBuff.rewind();
388
                        recBuff.rewind();
425
                        handle.addEntry(recBuff);
389
                        handle.addEntry(recBuff);
[+20] [20] 11 lines
[+20] public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
437
        setDaemon(true);
401
        setDaemon(true);
438
        LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
402
        LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
439
        // start bookie thread
403
        // start bookie thread
440
        super.start();
404
        super.start();
441
        syncThread.start();
405
        syncThread.start();
442
        gcThread.start();
406

   

    
   
407
        ledgerStorage.start();
443
        // set running here.
408
        // set running here.
444
        // since bookie server use running as a flag to tell bookie server whether it is alive
409
        // since bookie server use running as a flag to tell bookie server whether it is alive
445
        // if setting it in bookie thread, the watcher might run before bookie thread.
410
        // if setting it in bookie thread, the watcher might run before bookie thread.
446
        running = true;
411
        running = true;
447
        try {
412
        try {
[+20] [20] 13 lines
[+20] public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
461
        try {
426
        try {
462
            jmxBookieBean = new BookieBean(this);
427
            jmxBookieBean = new BookieBean(this);
463
            BKMBeanRegistry.getInstance().register(jmxBookieBean, parent);
428
            BKMBeanRegistry.getInstance().register(jmxBookieBean, parent);
464

    
   
429

   
465
            try {
430
            try {
466
                jmxLedgerCacheBean = this.ledgerCache.getJMXBean();
431
                jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
467
                BKMBeanRegistry.getInstance().register(jmxLedgerCacheBean, jmxBookieBean);
432
                BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
468
            } catch (Exception e) {
433
            } catch (Exception e) {
469
                LOG.warn("Failed to register with JMX for ledger cache", e);
434
                LOG.warn("Failed to register with JMX for ledger cache", e);
470
                jmxLedgerCacheBean = null;
435
                jmxLedgerStorageBean = null;
471
            }
436
            }
472

    
   

   
473
        } catch (Exception e) {
437
        } catch (Exception e) {
474
            LOG.warn("Failed to register with JMX", e);
438
            LOG.warn("Failed to register with JMX", e);
475
            jmxBookieBean = null;
439
            jmxBookieBean = null;
476
        }
440
        }
477
    }
441
    }
478

    
   
442

   
479
    /**
443
    /**
480
     * Unregister jmx
444
     * Unregister jmx
481
     */
445
     */
482
    public void unregisterJMX() {
446
    public void unregisterJMX() {
483
        try {
447
        try {
484
            if (jmxLedgerCacheBean != null) {
448
            if (jmxLedgerStorageBean != null) {
485
                BKMBeanRegistry.getInstance().unregister(jmxLedgerCacheBean);
449
                BKMBeanRegistry.getInstance().unregister(jmxLedgerStorageBean);
486
            }
450
            }
487
        } catch (Exception e) {
451
        } catch (Exception e) {
488
            LOG.warn("Failed to unregister with JMX", e);
452
            LOG.warn("Failed to unregister with JMX", e);
489
        }
453
        }
490
        try {
454
        try {
491
            if (jmxBookieBean != null) {
455
            if (jmxBookieBean != null) {
492
                BKMBeanRegistry.getInstance().unregister(jmxBookieBean);
456
                BKMBeanRegistry.getInstance().unregister(jmxBookieBean);
493
            }
457
            }
494
        } catch (Exception e) {
458
        } catch (Exception e) {
495
            LOG.warn("Failed to unregister with JMX", e);
459
            LOG.warn("Failed to unregister with JMX", e);
496
        }
460
        }
497
        jmxBookieBean = null;
461
        jmxBookieBean = null;
498
        jmxLedgerCacheBean = null;
462
        jmxLedgerStorageBean = null;
499
    }
463
    }
500

    
   
464

   
501

    
   
465

   
502
    /**
466
    /**
503
     * Instantiate the ZooKeeper client for the Bookie.
467
     * Instantiate the ZooKeeper client for the Bookie.
[+20] [20] 85 lines
[+20] [+] public boolean isRunning() {
589

    
   
553

   
590
    @Override
554
    @Override
591
    public void run() {
555
    public void run() {
592
        // bookie thread wait for journal thread
556
        // bookie thread wait for journal thread
593
        try {
557
        try {
594
            // start journal manager
558
            // start journal
595
            journalManager.start();
559
            journal.start();
596
            // wait until journal manager quits
560
            // wait until journal quits
597
            journalManager.join();
561
            journal.join();
598
        } catch (InterruptedException ie) {
562
        } catch (InterruptedException ie) {
599
        }
563
        }
600
        // if the journal thread quits due to shutting down, it is ok
564
        // if the journal thread quits due to shutting down, it is ok
601
        if (!shuttingdown) {
565
        if (!shuttingdown) {
602
            // some error found in journal thread and it quits
566
            // some error found in journal thread and it quits
[+20] [20] 17 lines
[+20] [+] public int shutdown() {
620
            if (running) { // avoid shutdown twice
584
            if (running) { // avoid shutdown twice
621
                // the exitCode only set when first shutdown usually due to exception found
585
                // the exitCode only set when first shutdown usually due to exception found
622
                this.exitCode = exitCode;
586
                this.exitCode = exitCode;
623
                // mark bookie as in shutting down progress
587
                // mark bookie as in shutting down progress
624
                shuttingdown = true;
588
                shuttingdown = true;
625
                // shut down gc thread, which depends on zookeeper client
589

   
626
                // also compaction will write entries again to entry log file
590
                // Shutdown the EntryLogger which has the GarbageCollector Thread running
627
                gcThread.shutdown();
591
                ledgerStorage.shutdown();

    
   
592

   
628
                // Shutdown the ZK client
593
                // Shutdown the ZK client
629
                if(zk != null) zk.close();
594
                if(zk != null) zk.close();
630
                // Shutdown journal manager
595
                // Shutdown journal
631
                journalManager.shutdown();
596
                journal.shutdown();
632
                this.join();
597
                this.join();
633
                syncThread.shutdown();
598
                syncThread.shutdown();
634

    
   
599

   
635
                // Shutdown the EntryLogger which has the GarbageCollector Thread running

   
636
                entryLogger.shutdown();

   
637
                // close Ledger Manager
600
                // close Ledger Manager
638
                ledgerManager.close();
601
                ledgerManager.close();
639
                // setting running to false here, so watch thread in bookie server know it only after bookie shut down
602
                // setting running to false here, so watch thread in bookie server know it only after bookie shut down
640
                running = false;
603
                running = false;
641
            }
604
            }
642
        } catch (InterruptedException ie) {
605
        } catch (InterruptedException ie) {
643
            LOG.error("Interrupted during shutting down bookie : ", ie);
606
            LOG.error("Interrupted during shutting down bookie : ", ie);
644
        }
607
        }
645
        return exitCode;
608
        return this.exitCode;
646
    }
609
    }
647

    
   
610

   
648
    /** 
611
    /** 
649
     * Retrieve the ledger descriptor for the ledger which entry should be added to.
612
     * Retrieve the ledger descriptor for the ledger which entry should be added to.
650
     * The LedgerDescriptor returned from this method should be eventually freed with 
613
     * The LedgerDescriptor returned from this method should be eventually freed with 
[+20] [20] 12 lines
[+20] [+] private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey)
663
            bb.putLong(METAENTRY_ID_LEDGER_KEY);
626
            bb.putLong(METAENTRY_ID_LEDGER_KEY);
664
            bb.putInt(masterKey.length);
627
            bb.putInt(masterKey.length);
665
            bb.put(masterKey);
628
            bb.put(masterKey);
666
            bb.flip();
629
            bb.flip();
667

    
   
630

   
668
            journalManager.logAddEntry(bb, ledgerId, METAENTRY_ID_LEDGER_KEY,
631
            journal.logAddEntry(bb, new NopWriteCallback(), null);
669
                                       new NopWriteCallback(), null);

   
670
            masterKeyCache.put(ledgerId, masterKey);
632
            masterKeyCache.put(ledgerId, masterKey);
671
        }
633
        }
672
        return l;
634
        return l;
673
    }
635
    }
674

    
   
636

   
675
    protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
637
    protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
676
        throws IOException, BookieException {
638
        throws IOException, BookieException {
677
        byte[] key = ledgerCache.readMasterKey(ledgerId);
639
        byte[] key = ledgerStorage.readMasterKey(ledgerId);
678
        LedgerDescriptor handle = handles.getHandle(ledgerId, key);
640
        LedgerDescriptor handle = handles.getHandle(ledgerId, key);
679
        handle.addEntry(entry);
641
        handle.addEntry(entry);
680
    }
642
    }
681

    
   
643

   
682
    /**
644
    /**
[+20] [20] 7 lines
[+20] [+] private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
690

    
   
652

   
691
        entry.rewind();
653
        entry.rewind();
692
        if (LOG.isTraceEnabled()) {
654
        if (LOG.isTraceEnabled()) {
693
            LOG.trace("Adding " + entryId + "@" + ledgerId);
655
            LOG.trace("Adding " + entryId + "@" + ledgerId);
694
        }
656
        }
695
        journalManager.logAddEntry(entry, ledgerId, entryId, cb, ctx);
657
        journal.logAddEntry(entry, cb, ctx);
696
    }
658
    }
697

    
   
659

   
698
    /**
660
    /**
699
     * Add entry to a ledger, even if the ledger has previous been fenced. This should only
661
     * Add entry to a ledger, even if the ledger has previous been fenced. This should only
700
     * happen in bookie recovery or ledger recovery cases, where entries are being replicates 
662
     * happen in bookie recovery or ledger recovery cases, where entries are being replicates 
[+20] [20] 97 lines
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
Diff Revision 1 Diff Revision 3
 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
Diff Revision 1 Diff Revision 3
 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalManager.java
Diff Revision 1 Diff Revision 3 - File Reverted
 
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Diff Revision 1 Diff Revision 3
 
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
Diff Revision 1 Diff Revision 3
 
  1. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java: Loading...
  2. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java: Loading...
  3. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java: Loading...
  4. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalManager.java: Loading...
  5. bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java: Loading...
  6. bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java: Loading...