Review Board 1.7.22


BOOKKEEPER-218: Provide journal manager to manage journal related operations

Review Request #4737 - Created April 16, 2012 and updated

Sijie Guo
BOOKKEEPER-218
Reviewers
bookkeeper
bookkeeper-git
Currently we put all journal related operations in Bookie class. It would be better to provide a journal manager to provide journal related operations. It would make Bookie logic more clearly.

Besides that, some admin tools like BOOKKEEPER-183 needs to provide could use JournalManager to read/check journal files directly.

 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Revision 128fc09 New Change
[20] 18 lines
[+20]
19
 *
19
 *
20
 */
20
 */
21

    
   
21

   
22
package org.apache.bookkeeper.bookie;
22
package org.apache.bookkeeper.bookie;
23

    
   
23

   
24
import java.io.BufferedReader;

   
25
import java.io.BufferedWriter;

   
26
import java.io.InputStreamReader;

   
27
import java.io.OutputStreamWriter;

   
28
import java.io.File;
24
import java.io.File;
29
import java.io.FileInputStream;

   
30
import java.io.FileNotFoundException;
25
import java.io.FileNotFoundException;
31
import java.io.FileOutputStream;

   
32
import java.io.IOException;
26
import java.io.IOException;
33
import java.io.RandomAccessFile;

   
34
import java.io.FilenameFilter;
27
import java.io.FilenameFilter;
35
import java.net.InetAddress;
28
import java.net.InetAddress;
36
import java.net.InetSocketAddress;
29
import java.net.InetSocketAddress;
37
import java.net.UnknownHostException;
30
import java.net.UnknownHostException;
38
import java.nio.ByteBuffer;
31
import java.nio.ByteBuffer;
39
import java.nio.channels.FileChannel;

   
40
import java.util.ArrayList;
32
import java.util.ArrayList;
41
import java.util.Collections;
33
import java.util.Collections;
42
import java.util.Map;
34
import java.util.Map;
43
import java.util.HashMap;
35
import java.util.HashMap;
44
import java.util.LinkedList;

   
45
import java.util.List;
36
import java.util.List;
46
import java.util.concurrent.LinkedBlockingQueue;

   
47
import java.util.concurrent.atomic.AtomicBoolean;
37
import java.util.concurrent.atomic.AtomicBoolean;
48

    
   
38

   
49
import org.apache.bookkeeper.meta.LedgerManager;
39
import org.apache.bookkeeper.meta.LedgerManager;
50
import org.apache.bookkeeper.meta.LedgerManagerFactory;
40
import org.apache.bookkeeper.meta.LedgerManagerFactory;
51
import org.apache.bookkeeper.bookie.BookieException;
41
import org.apache.bookkeeper.bookie.BookieException;

    
   
42
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
52
import org.apache.bookkeeper.conf.ServerConfiguration;
43
import org.apache.bookkeeper.conf.ServerConfiguration;
53
import org.apache.bookkeeper.jmx.BKMBeanInfo;
44
import org.apache.bookkeeper.jmx.BKMBeanInfo;
54
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
45
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
55
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
46
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
56
import org.slf4j.Logger;
47
import org.slf4j.Logger;
[+20] [20] 11 lines
[+20]
68
 */
59
 */
69

    
   
60

   
70
public class Bookie extends Thread {
61
public class Bookie extends Thread {
71
    static Logger LOG = LoggerFactory.getLogger(Bookie.class);
62
    static Logger LOG = LoggerFactory.getLogger(Bookie.class);
72

    
   
63

   
73
    final static long MB = 1024 * 1024L;

   
74
    // max journal file size

   
75
    final long maxJournalSize;

   
76
    // number journal files kept before marked journal

   
77
    final int maxBackupJournals;

   
78

    
   

   
79
    final File journalDirectory;
64
    final File journalDirectory;
80

    
   

   
81
    final File ledgerDirectories[];
65
    final File ledgerDirectories[];
82

    
   

   
83
    final ServerConfiguration conf;
66
    final ServerConfiguration conf;
84

    
   
67

   
85
    final SyncThread syncThread;
68
    final SyncThread syncThread;
86
    final LedgerManager ledgerManager;
69
    final LedgerManager ledgerManager;
87
    final LedgerStorage ledgerStorage;
70
    final LedgerStorage ledgerStorage;

    
   
71
    final Journal journal;
88
    final HandleFactory handles;
72
    final HandleFactory handles;
89

    
   
73

   
90
    static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
74
    static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
91

    
   
75

   
92
    // ZK registration path for this bookie
76
    // ZK registration path for this bookie
[+20] [20] 43 lines
[+20] [+] public long getLedger() {
136
        public long getEntry() {
120
        public long getEntry() {
137
            return entryId;
121
            return entryId;
138
        }
122
        }
139
    }
123
    }
140

    
   
124

   

    
   
125
    // Write Callback do nothing

    
   
126
    class NopWriteCallback implements WriteCallback {

    
   
127
        @Override

    
   
128
        public void writeComplete(int rc, long ledgerId, long entryId,

    
   
129
                                  InetSocketAddress addr, Object ctx) {

    
   
130
            LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",

    
   
131
                      new Object[] { entryId, ledgerId, addr, rc });

    
   
132
        }

    
   
133
    }
141

    
   
134

   
142
    /**
135
    /**
143
     * SyncThread is a background thread which flushes ledger index pages periodically.
136
     * SyncThread is a background thread which flushes ledger index pages periodically.
144
     * Also it takes responsibility of garbage collecting journal files.
137
     * Also it takes responsibility of garbage collecting journal files.
145
     *
138
     *
[+20] [20] 51 lines
[+20] [+] public void run() {
197
                    // set flushing flag failed, means flushing is true now
190
                    // set flushing flag failed, means flushing is true now
198
                    // indicates another thread wants to interrupt sync thread to exit
191
                    // indicates another thread wants to interrupt sync thread to exit
199
                    break;
192
                    break;
200
                }
193
                }
201

    
   
194

   
202
                lastLogMark.markLog();
195
                // journal mark log

    
   
196
                journal.markLog();
203

    
   
197

   
204
                boolean flushFailed = false;
198
                boolean flushFailed = false;
205
                try {
199
                try {
206
                    ledgerStorage.flush();
200
                    ledgerStorage.flush();
207
                } catch (IOException e) {
201
                } catch (IOException e) {
208
                    LOG.error("Exception flushing Ledger", e);
202
                    LOG.error("Exception flushing Ledger", e);
209
                    flushFailed = true;
203
                    flushFailed = true;
210
                }
204
                }
211

    
   
205

   
212
                // if flush failed, we should not roll last mark, otherwise we would
206
                // if flush failed, we should not roll last mark, otherwise we would
213
                // have some ledgers are not flushed and their journal entries were lost
207
                // have some ledgers are not flushed and their journal entries were lost
214
                if (!flushFailed) {
208
                if (!flushFailed) {
215

    
   
209
                    journal.rollLog();
216
                    lastLogMark.rollLog();
210
                    journal.gcJournals();
217

    
   

   
218
                    // list the journals that have been marked

   
219
                    List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {

   
220
                        @Override

   
221
                        public boolean accept(long journalId) {

   
222
                            if (journalId < lastLogMark.lastMark.txnLogId) {

   
223
                                return true;

   
224
                            } else {

   
225
                                return false;

   
226
                            }

   
227
                        }

   
228
                    });

   
229

    
   

   
230
                    // keep MAX_BACKUP_JOURNALS journal files before marked journal

   
231
                    if (logs.size() >= maxBackupJournals) {

   
232
                        int maxIdx = logs.size() - maxBackupJournals;

   
233
                        for (int i=0; i<maxIdx; i++) {

   
234
                            long id = logs.get(i);

   
235
                            // make sure the journal id is smaller than marked journal id

   
236
                            if (id < lastLogMark.lastMark.txnLogId) {

   
237
                                File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");

   
238
                                journalFile.delete();

   
239
                                LOG.info("garbage collected journal " + journalFile.getName());

   
240
                            }

   
241
                        }

   
242
                    }

   
243

    
   

   
244
                }
211
                }
245

    
   
212

   
246
                // clear flushing flag
213
                // clear flushing flag
247
                flushing.set(false);
214
                flushing.set(false);
248
            }
215
            }
[+20] [20] 123 lines
[+20] [+] public static File[] getCurrentDirectories(File[] dirs) {
372
            throws IOException, KeeperException, InterruptedException, BookieException {
339
            throws IOException, KeeperException, InterruptedException, BookieException {
373
        super("Bookie-" + conf.getBookiePort());
340
        super("Bookie-" + conf.getBookiePort());
374
        this.conf = conf;
341
        this.conf = conf;
375
        this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
342
        this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
376
        this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
343
        this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
377
        this.maxJournalSize = conf.getMaxJournalSize() * MB;

   
378
        this.maxBackupJournals = conf.getMaxBackupJournals();

   
379

    
   
344

   
380
        // instantiate zookeeper client to initialize ledger manager
345
        // instantiate zookeeper client to initialize ledger manager
381
        this.zk = instantiateZookeeperClient(conf);
346
        this.zk = instantiateZookeeperClient(conf);
382
        checkEnvironment(this.zk);
347
        checkEnvironment(this.zk);
383

    
   
348

   
384
        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
349
        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
385

    
   
350

   
386
        syncThread = new SyncThread(conf);
351
        syncThread = new SyncThread(conf);
387
        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
352
        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
388

    
   

   
389
        handles = new HandleFactoryImpl(ledgerStorage);
353
        handles = new HandleFactoryImpl(ledgerStorage);

    
   
354
        // instantiate the journal

    
   
355
        journal = new Journal(conf);
390

    
   
356

   
391
        // replay journals
357
        // replay journals
392
        readJournal();
358
        readJournal();
393
    }
359
    }
394

    
   
360

   
395
    private void readJournal() throws IOException, BookieException {
361
    private void readJournal() throws IOException, BookieException {
396
        lastLogMark.readLog();
362
        journal.replay(new JournalScanner() {
397
        if (LOG.isDebugEnabled()) {

   
398
            LOG.debug("Last Log Mark : " + lastLogMark);

   
399
        }

   
400
        final long markedLogId = lastLogMark.txnLogId;

   
401
        List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {

   
402
            @Override
363
            @Override
403
            public boolean accept(long journalId) {
364
            public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
404
                if (journalId < markedLogId) {

   
405
                    return false;

   
406
                }

   
407
                return true;

   
408
            }

   
409
        });

   
410
        // last log mark may be missed due to no sync up before

   
411
        // validate filtered log ids only when we have markedLogId

   
412
        if (markedLogId > 0) {

   
413
            if (logs.size() == 0 || logs.get(0) != markedLogId) {

   
414
                throw new IOException("Recovery log " + markedLogId + " is missing");

   
415
            }

   
416
        }

   
417
        if (LOG.isDebugEnabled()) {

   
418
            LOG.debug("Try to relay journal logs : " + logs);

   
419
        }

   
420
        // TODO: When reading in the journal logs that need to be synced, we

   
421
        // should use BufferedChannels instead to minimize the amount of

   
422
        // system calls done.

   
423
        ByteBuffer lenBuff = ByteBuffer.allocate(4);

   
424
        ByteBuffer recBuff = ByteBuffer.allocate(64*1024);

   
425
        for(Long id: logs) {

   
426
            JournalChannel recLog;

   
427
            if(id == markedLogId) {

   
428
                long markedLogPosition = lastLogMark.txnLogPosition;

   
429
                recLog = new JournalChannel(journalDirectory, id, markedLogPosition);

   
430
            } else {

   
431
                recLog = new JournalChannel(journalDirectory, id);

   
432
            }

   
433

    
   

   
434
            while(true) {

   
435
                lenBuff.clear();

   
436
                fullRead(recLog, lenBuff);

   
437
                if (lenBuff.remaining() != 0) {

   
438
                    break;

   
439
                }

   
440
                lenBuff.flip();

   
441
                int len = lenBuff.getInt();

   
442
                if (len == 0) {

   
443
                    break;

   
444
                }

   
445
                recBuff.clear();

   
446
                if (recBuff.remaining() < len) {

   
447
                    recBuff = ByteBuffer.allocate(len);

   
448
                }

   
449
                recBuff.limit(len);

   
450
                if (fullRead(recLog, recBuff) != len) {

   
451
                    // This seems scary, but it just means that this is where we

   
452
                    // left off writing

   
453
                    break;

   
454
                }

   
455
                recBuff.flip();

   
456
                long ledgerId = recBuff.getLong();
365
                long ledgerId = recBuff.getLong();
457
                long entryId = recBuff.getLong();
366
                long entryId = recBuff.getLong();
458
                try {
367
                try {
459
                    LOG.debug("Replay journal - ledger id : {}, entry id : {}.", ledgerId, entryId);
368
                    LOG.debug("Replay journal - ledger id : {}, entry id : {}.", ledgerId, entryId);
460
                    if (entryId == METAENTRY_ID_LEDGER_KEY) {
369
                    if (entryId == METAENTRY_ID_LEDGER_KEY) {
461
                        if (recLog.getFormatVersion() >= 3) {
370
                        if (journalVersion >= 3) {
462
                            int masterKeyLen = recBuff.getInt();
371
                            int masterKeyLen = recBuff.getInt();
463
                            byte[] masterKey = new byte[masterKeyLen];
372
                            byte[] masterKey = new byte[masterKeyLen];
464

    
   
373

   
465
                            recBuff.get(masterKey);
374
                            recBuff.get(masterKey);
466
                            masterKeyCache.put(ledgerId, masterKey);
375
                            masterKeyCache.put(ledgerId, masterKey);
467
                        } else {
376
                        } else {
468
                            throw new IOException("Invalid journal. Contains journalKey "
377
                            throw new IOException("Invalid journal. Contains journalKey "
469
                                    + " but layout version (" + recLog.getFormatVersion()
378
                                    + " but layout version (" + journalVersion
470
                                    + ") is too old to hold this");
379
                                    + ") is too old to hold this");
471
                        }
380
                        }
472
                    } else {
381
                    } else {
473
                        byte[] key = masterKeyCache.get(ledgerId);
382
                        byte[] key = masterKeyCache.get(ledgerId);
474
                        if (key == null) {
383
                        if (key == null) {
[+20] [20] 4 lines
[+20] public boolean accept(long journalId) { public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
479
                        recBuff.rewind();
388
                        recBuff.rewind();
480
                        handle.addEntry(recBuff);
389
                        handle.addEntry(recBuff);
481
                    }
390
                    }
482
                } catch (NoLedgerException nsle) {
391
                } catch (NoLedgerException nsle) {
483
                    LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
392
                    LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
484
                    continue;
393
                } catch (BookieException be) {

    
   
394
                    throw new IOException(be);
485
                }
395
                }
486
            }
396
            }
487
            recLog.close();
397
        });
488
        }

   
489
    }
398
    }
490

    
   
399

   
491
    synchronized public void start() {
400
    synchronized public void start() {
492
        setDaemon(true);
401
        setDaemon(true);
493
        LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
402
        LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());

    
   
403
        // start bookie thread
494
        super.start();
404
        super.start();
495
        syncThread.start();
405
        syncThread.start();
496

    
   
406

   
497
        ledgerStorage.start();
407
        ledgerStorage.start();
498
        // set running here.
408
        // set running here.
[+20] [20] 6 lines
[+20] public boolean accept(long journalId) { public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
505
            LOG.error("Couldn't register bookie with zookeeper, shutting down", e);
415
            LOG.error("Couldn't register bookie with zookeeper, shutting down", e);
506
            shutdown(ExitCode.ZK_REG_FAIL);
416
            shutdown(ExitCode.ZK_REG_FAIL);
507
        }
417
        }
508
    }
418
    }
509

    
   
419

   
510
    public static interface JournalIdFilter {

   
511
        public boolean accept(long journalId);

   
512
    }

   
513

    
   

   
514
    /**

   
515
     * List all journal ids by a specified journal id filer

   
516
     *

   
517
     * @param journalDir journal dir

   
518
     * @param filter journal id filter

   
519
     * @return list of filtered ids

   
520
     */

   
521
    public static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {

   
522
        File logFiles[] = journalDir.listFiles();

   
523
        List<Long> logs = new ArrayList<Long>();

   
524
        for(File f: logFiles) {

   
525
            String name = f.getName();

   
526
            if (!name.endsWith(".txn")) {

   
527
                continue;

   
528
            }

   
529
            String idString = name.split("\\.")[0];

   
530
            long id = Long.parseLong(idString, 16);

   
531
            if (filter != null) {

   
532
                if (filter.accept(id)) {

   
533
                    logs.add(id);

   
534
                }

   
535
            } else {

   
536
                logs.add(id);

   
537
            }

   
538
        }

   
539
        Collections.sort(logs);

   
540
        return logs;

   
541
    }

   
542

    
   

   
543
    /**
420
    /**
544
     * Register jmx with parent
421
     * Register jmx with parent
545
     *
422
     *
546
     * @param parent parent bk mbean info
423
     * @param parent parent bk mbean info
547
     */
424
     */
[+20] [20] 120 lines
[+20] [+] public void process(WatchedEvent event) {
668
        });
545
        });
669
        isZkExpired = false;
546
        isZkExpired = false;
670
        return newZk;
547
        return newZk;
671
    }
548
    }
672

    
   
549

   
673
    private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {

   
674
        int total = 0;

   
675
        while(bb.remaining() > 0) {

   
676
            int rc = fc.read(bb);

   
677
            if (rc <= 0) {

   
678
                return total;

   
679
            }

   
680
            total += rc;

   
681
        }

   
682
        return total;

   
683
    }

   
684

    
   

   
685
    static class QueueEntry {

   
686
        QueueEntry(ByteBuffer entry, long ledgerId, long entryId,

   
687
                   WriteCallback cb, Object ctx) {

   
688
            this.entry = entry.duplicate();

   
689
            this.cb = cb;

   
690
            this.ctx = ctx;

   
691
            this.ledgerId = ledgerId;

   
692
            this.entryId = entryId;

   
693
        }

   
694

    
   

   
695
        ByteBuffer entry;

   
696

    
   

   
697
        long ledgerId;

   
698

    
   

   
699
        long entryId;

   
700

    
   

   
701
        WriteCallback cb;

   
702

    
   

   
703
        Object ctx;

   
704
    }

   
705

    
   

   
706
    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();

   
707

    
   

   
708
    class LastLogMark {

   
709
        long txnLogId;

   
710
        long txnLogPosition;

   
711
        LastLogMark lastMark;

   
712
        LastLogMark(long logId, long logPosition) {

   
713
            this.txnLogId = logId;

   
714
            this.txnLogPosition = logPosition;

   
715
        }

   
716
        synchronized void setLastLogMark(long logId, long logPosition) {

   
717
            txnLogId = logId;

   
718
            txnLogPosition = logPosition;

   
719
        }

   
720
        synchronized void markLog() {

   
721
            lastMark = new LastLogMark(txnLogId, txnLogPosition);

   
722
        }

   
723
        synchronized void rollLog() {

   
724
            byte buff[] = new byte[16];

   
725
            ByteBuffer bb = ByteBuffer.wrap(buff);

   
726
            // we should record <logId, logPosition> marked in markLog

   
727
            // which is safe since records before lastMark have been

   
728
            // persisted to disk (both index & entry logger)

   
729
            bb.putLong(lastMark.txnLogId);

   
730
            bb.putLong(lastMark.txnLogPosition);

   
731
            if (LOG.isDebugEnabled()) {

   
732
                LOG.debug("RollLog to persist last marked log : " + lastMark);

   
733
            }

   
734
            for(File dir: ledgerDirectories) {

   
735
                File file = new File(dir, "lastMark");

   
736
                try {

   
737
                    FileOutputStream fos = new FileOutputStream(file);

   
738
                    fos.write(buff);

   
739
                    fos.getChannel().force(true);

   
740
                    fos.close();

   
741
                } catch (IOException e) {

   
742
                    LOG.error("Problems writing to " + file, e);

   
743
                }

   
744
            }

   
745
        }

   
746

    
   

   
747
        /**

   
748
         * Read last mark from lastMark file.

   
749
         * The last mark should first be max journal log id,

   
750
         * and then max log position in max journal log.

   
751
         */

   
752
        synchronized void readLog() {

   
753
            byte buff[] = new byte[16];

   
754
            ByteBuffer bb = ByteBuffer.wrap(buff);

   
755
            for(File dir: ledgerDirectories) {

   
756
                File file = new File(dir, "lastMark");

   
757
                try {

   
758
                    FileInputStream fis = new FileInputStream(file);

   
759
                    fis.read(buff);

   
760
                    fis.close();

   
761
                    bb.clear();

   
762
                    long i = bb.getLong();

   
763
                    long p = bb.getLong();

   
764
                    if (i > txnLogId) {

   
765
                        txnLogId = i;

   
766
                        if(p > txnLogPosition) {

   
767
                          txnLogPosition = p;

   
768
                        }

   
769
                    }

   
770
                } catch (IOException e) {

   
771
                    LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");

   
772
                }

   
773
            }

   
774
        }

   
775

    
   

   
776
        @Override

   
777
        public String toString() {

   
778
            StringBuilder sb = new StringBuilder();

   
779
            

   
780
            sb.append("LastMark: logId - ").append(txnLogId)

   
781
              .append(" , position - ").append(txnLogPosition);

   
782
            

   
783
            return sb.toString();

   
784
        }

   
785
    }

   
786

    
   

   
787
    private LastLogMark lastLogMark = new LastLogMark(0, 0);

   
788

    
   

   
789
    LastLogMark getLastLogMark() {

   
790
        return lastLogMark;

   
791
    }

   
792

    
   

   
793
    public boolean isRunning() {
550
    public boolean isRunning() {
794
        return running;
551
        return running;
795
    }
552
    }
796

    
   
553

   
797
    /**

   
798
     * A thread used for persisting journal entries to journal files.

   
799
     * 

   
800
     * <p>

   
801
     * Besides persisting journal entries, it also takes responsibility of

   
802
     * rolling journal files when a journal file reaches journal file size

   
803
     * limitation.

   
804
     * </p>

   
805
     * <p>

   
806
     * During journal rolling, it first closes the writing journal, generates

   
807
     * new journal file using current timestamp, and continue persistence logic.

   
808
     * Those journals will be garbage collected in SyncThread.

   
809
     * </p>

   
810
     */

   
811
    @Override
554
    @Override
812
    public void run() {
555
    public void run() {
813
        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
556
        // bookie thread wait for journal thread
814
        ByteBuffer lenBuff = ByteBuffer.allocate(4);

   
815
        try {
557
        try {
816
            long logId = 0;
558
            // start journal
817
            JournalChannel logFile = null;
559
            journal.start();
818
            BufferedChannel bc = null;
560
            // wait until journal quits
819
            long nextPrealloc = 0;
561
            journal.join();
820
            long lastFlushPosition = 0;
562
        } catch (InterruptedException ie) {
821

    
   

   
822
            QueueEntry qe = null;

   
823
            while (true) {

   
824
                // new journal file to write

   
825
                if (null == logFile) {

   
826
                    logId = System.currentTimeMillis();

   
827
                    logFile = new JournalChannel(journalDirectory, logId);

   
828
                    bc = logFile.getBufferedChannel();

   
829

    
   

   
830
                    lastFlushPosition = 0;

   
831
                }

   
832

    
   

   
833
                if (qe == null) {

   
834
                    if (toFlush.isEmpty()) {

   
835
                        qe = queue.take();

   
836
                    } else {

   
837
                        qe = queue.poll();

   
838
                        if (qe == null || bc.position() > lastFlushPosition + 512*1024) {

   
839
                            //logFile.force(false);

   
840
                            bc.flush(true);

   
841
                            lastFlushPosition = bc.position();

   
842
                            lastLogMark.setLastLogMark(logId, lastFlushPosition);

   
843
                            for (QueueEntry e : toFlush) {

   
844
                                e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);

   
845
                            }

   
846
                            toFlush.clear();

   
847

    
   

   
848
                            // check whether journal file is over file limit

   
849
                            if (bc.position() > maxJournalSize) {

   
850
                                logFile.close();

   
851
                                logFile = null;

   
852
                                continue;

   
853
                            }

   
854
                        }

   
855
                    }

   
856
                }

   
857

    
   

   
858
                if (isZkExpired) {

   
859
                    LOG.warn("Exiting... zk client has expired.");

   
860
                    break;

   
861
                }

   
862
                if (qe == null) { // no more queue entry

   
863
                    continue;

   
864
                }

   
865
                lenBuff.clear();

   
866
                lenBuff.putInt(qe.entry.remaining());

   
867
                lenBuff.flip();

   
868
                //

   
869
                // we should be doing the following, but then we run out of

   
870
                // direct byte buffers

   
871
                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });

   
872
                bc.write(lenBuff);

   
873
                bc.write(qe.entry);

   
874

    
   

   
875
                logFile.preAllocIfNeeded();

   
876

    
   

   
877
                toFlush.add(qe);

   
878
                qe = null;

   
879
            }
563
        }
880
        } catch (Exception e) {
564
        // if the journal thread quits due to shutting down, it is ok
881
            // if the bookie thread quits due to shutting down, it is ok
565
        if (!shuttingdown) {
882
            if (shuttingdown) {
566
            // some error found in journal thread and it quits
883
                LOG.warn("Bookie thread exits when shutting down", e);

   
884
            } else {

   
885
                // some error found in bookie thread and it quits

   
886
                // following add operations to it would hang unit client timeout
567
            // following add operations to it would hang unit client timeout
887
                // so we should let bookie server exists
568
            // so we should let bookie server exists
888
                LOG.error("Exception occurred in bookie thread and it quits : ", e);
569
            LOG.error("Journal manager quits unexpectedly.");
889
                shutdown(ExitCode.BOOKIE_EXCEPTION);
570
            shutdown(ExitCode.BOOKIE_EXCEPTION);
890
            }
571
        }
891
        }
572
    }
892
    }

   
893

    
   
573

   
894
    // provided a public shutdown method for other caller
574
    // provided a public shutdown method for other caller
895
    // to shut down bookie gracefully
575
    // to shut down bookie gracefully
896
    public int shutdown() {
576
    public int shutdown() {
897
        return shutdown(ExitCode.OK);
577
        return shutdown(ExitCode.OK);
[+20] [20] 12 lines
[+20] public int shutdown() {
910
                // Shutdown the EntryLogger which has the GarbageCollector Thread running
590
                // Shutdown the EntryLogger which has the GarbageCollector Thread running
911
                ledgerStorage.shutdown();
591
                ledgerStorage.shutdown();
912

    
   
592

   
913
                // Shutdown the ZK client
593
                // Shutdown the ZK client
914
                if(zk != null) zk.close();
594
                if(zk != null) zk.close();
915
                this.interrupt();
595
                // Shutdown journal

    
   
596
                journal.shutdown();
916
                this.join();
597
                this.join();
917
                syncThread.shutdown();
598
                syncThread.shutdown();
918

    
   
599

   
919
                // close Ledger Manager
600
                // close Ledger Manager
920
                ledgerManager.close();
601
                ledgerManager.close();
[+20] [20] 24 lines
[+20] [+] private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey)
945
            bb.putLong(METAENTRY_ID_LEDGER_KEY);
626
            bb.putLong(METAENTRY_ID_LEDGER_KEY);
946
            bb.putInt(masterKey.length);
627
            bb.putInt(masterKey.length);
947
            bb.put(masterKey);
628
            bb.put(masterKey);
948
            bb.flip();
629
            bb.flip();
949

    
   
630

   
950
            queue.add(new QueueEntry(bb,
631
            journal.logAddEntry(bb, new NopWriteCallback(), null);
951
                                     ledgerId, METAENTRY_ID_LEDGER_KEY,

   
952
                                     new WriteCallback() {

   
953
                                         public void writeComplete(int rc, long ledgerId, 

   
954
                                                 long entryId, InetSocketAddress addr,

   
955
                                                 Object ctx) {

   
956
                                             // do nothing

   
957
                                         }

   
958
                                     }, null));

   
959
            masterKeyCache.put(ledgerId, masterKey);
632
            masterKeyCache.put(ledgerId, masterKey);
960
        }
633
        }
961
        return l;
634
        return l;
962
    }
635
    }
963

    
   
636

   
[+20] [20] 15 lines
[+20] [+] private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
979

    
   
652

   
980
        entry.rewind();
653
        entry.rewind();
981
        if (LOG.isTraceEnabled()) {
654
        if (LOG.isTraceEnabled()) {
982
            LOG.trace("Adding " + entryId + "@" + ledgerId);
655
            LOG.trace("Adding " + entryId + "@" + ledgerId);
983
        }
656
        }
984
        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
657
        journal.logAddEntry(entry, cb, ctx);
985
    }
658
    }
986

    
   
659

   
987
    /**
660
    /**
988
     * Add entry to a ledger, even if the ledger has previous been fenced. This should only
661
     * Add entry to a ledger, even if the ledger has previous been fenced. This should only
989
     * happen in bookie recovery or ledger recovery cases, where entries are being replicates 
662
     * happen in bookie recovery or ledger recovery cases, where entries are being replicates 
[+20] [20] 97 lines
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
Revision fa5171b New Change
 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java
Revision 8418469 New Change
 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
New File
 
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Revision c172224 New Change
 
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
Revision e795fce New Change
 
  1. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java: Loading...
  2. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java: Loading...
  3. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java: Loading...
  4. bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java: Loading...
  5. bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java: Loading...
  6. bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java: Loading...