Index: src/mongo/db/btreecursor.cpp =================================================================== --- src/mongo/db/btreecursor.cpp (revision 46272) +++ src/mongo/db/btreecursor.cpp (revision 47136) @@ -23,6 +23,8 @@ #include "curop-inl.h" #include "queryutil.h" +#include "db/toku/cursor.h" + namespace mongo { template< class V > @@ -205,6 +207,9 @@ BtreeCursor* BtreeCursor::make( NamespaceDetails * nsd , int idxNo , const IndexDetails& indexDetails ) { int v = indexDetails.version(); + + if( v == 2 ) + return new TokuDBCursor( nsd , idxNo , indexDetails ); if( v == 1 ) return new BtreeCursorImpl( nsd , idxNo , indexDetails ); Index: src/mongo/db/indexkey.cpp =================================================================== --- src/mongo/db/indexkey.cpp (revision 46272) +++ src/mongo/db/indexkey.cpp (revision 47136) @@ -406,6 +406,8 @@ g.getKeys( obj, keys ); break; } + // tokudb: use btree v1 key format + case 2: case 1: { KeyGeneratorV1 g( *this ); g.getKeys( obj, keys ); Index: src/mongo/db/ops/update.cpp =================================================================== --- src/mongo/db/ops/update.cpp (revision 46272) +++ src/mongo/db/ops/update.cpp (revision 47136) @@ -347,6 +347,15 @@ auto_ptr mss = useMods->prepare( onDisk ); + // tokudb: modsIsIndexed must be true if there exists at least one clustering index + for (int idx_i = 0; idx_i < d->nIndexesBeingBuilt(); idx_i++) { + IndexDetails &idx = d->idx(idx_i); + if (idx.info.obj()["clustering"].trueValue()) { + modsIsIndexed = true; + break; + } + } + bool willAdvanceCursor = multi && c->ok() && ( modsIsIndexed || ! mss->canApplyInPlace() ); if ( willAdvanceCursor ) { Index: src/mongo/db/toku/invariant.h =================================================================== --- src/mongo/db/toku/invariant.h (revision 0) +++ src/mongo/db/toku/invariant.h (revision 47136) @@ -0,0 +1,22 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TOKUDB_INVARIANT_H +#define TOKUDB_INVARIANT_H + +#define invariant(x) { if (!(x)) { fprintf(stderr, "%s:%d Assertion `" #x "' failed\n", __FUNCTION__, __LINE__); fflush(stderr); massert(16399, "toku assert fail", 0); }} + +#endif /* TOKUDB_INVARIANT_H */ Index: src/mongo/db/toku/row_buffer.cpp =================================================================== --- src/mongo/db/toku/row_buffer.cpp (revision 0) +++ src/mongo/db/toku/row_buffer.cpp (revision 47136) @@ -0,0 +1,142 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "pch.h" + +#include +#include + +namespace mongo { + + RowBuffer::RowBuffer() : + _size(_BUF_SIZE_PREFERRED), + _end_offset(0), + _current_offset(0) { + _buf = reinterpret_cast(malloc(_size)); + invariant(_buf); + } + + RowBuffer::~RowBuffer() { + free(_buf); + } + + // append the given key, loc and obj to the end of the buffer + // + // important note: empty bson objects still take up 5 bytes. so we + // always write at least 5 bytes to disk for the key and obj + void RowBuffer::append(const BSONObj &key, const DiskLoc &loc, const BSONObj &obj) { + size_t key_size = key.objsize(); + size_t loc_size = sizeof(DiskLoc); + size_t obj_size = obj.objsize(); + size_t size_needed = _end_offset + key_size + loc_size + obj_size; + + // if we need more than we have, realloc. + if (size_needed > _size) { + _buf = reinterpret_cast(realloc(_buf, size_needed)); + invariant(_buf); + _size = size_needed; + } + + // append the key, update the end offset + memcpy(_buf + _end_offset, key.objdata(), key_size); + _end_offset += key_size; + // append the loc, update the end offset + memcpy(_buf + _end_offset, &loc, loc_size); + _end_offset += loc_size; + // append the obj, update the end offset + memcpy(_buf + _end_offset, obj.objdata(), obj_size); + _end_offset += obj_size; + + // postcondition: end offset is correctly bounded + invariant(_end_offset <= _size); + } + + // the row buffer is gorged if its current size is greater + // than the preferred size or is almost full. + // return: + // true, buffer is gorged + // false, buffer could fit more data + // rationale: + // - if true, then it makes more sense to empty and refill + // the buffer than trying to stuff more in it. + // - if false, then an append probably won't cause a realloc, + // so go ahead and do it. + bool RowBuffer::isGorged() const { + const int threshold = 100; + const bool almost_full = _end_offset + threshold > _size; + const bool too_big = _size > _BUF_SIZE_PREFERRED; + return almost_full || too_big; + } + + // move the internal buffer position to the next key, loc pair + // returns: + // true, the buffer is ready to be read via current() + // false, the buffer has no more data + bool RowBuffer::next() { + // precondition: there exists a next element advance + invariant(_current_offset < _end_offset); + + // seek passed the current key, loc, and obj. + size_t key_size = currentKey().objsize(); + size_t loc_size = sizeof(DiskLoc); + size_t obj_size = currentObj().objsize(); + _current_offset += key_size + loc_size + obj_size; + + // postcondition: we did not seek passed the end of the buffer + invariant(_current_offset <= _end_offset); + + // next succeeds if the current offset is still before the end + return _current_offset < _end_offset ? true : false; + } + + // get the current key from the buffer. the buffer must have + // valid data in order to call this, assert otherwise. + BSONObj RowBuffer::currentKey() const { + invariant(_current_offset < _end_offset); + const char *key_buf = _buf + _current_offset; + BSONObj key = BSONObj(key_buf); + return key; + } + + // get the current diskloc from the buffer. the buffer must have + // valid data in order to call this, assert otherwise. + DiskLoc RowBuffer::currentLoc() const { + invariant(_current_offset < _end_offset); + const char *key_buf = _buf + _current_offset; + BSONObj key = BSONObj(key_buf); + DiskLoc *loc = (DiskLoc *) (key_buf + key.objsize()); + return *loc; + } + + // get the current obj from the buffer. + BSONObj RowBuffer::currentObj() const { + invariant(_current_offset < _end_offset); + const char *key_buf = _buf + _current_offset; + BSONObj key = BSONObj(key_buf); + const char *obj_buf = key_buf + key.objsize() + sizeof(DiskLoc); + BSONObj obj = BSONObj(obj_buf); + return obj; + } + + // empty the row buffer, resetting all data and internal positions + void RowBuffer::empty() { + free(_buf); + _buf = reinterpret_cast(malloc(_BUF_SIZE_PREFERRED)); + _size = _BUF_SIZE_PREFERRED; + _current_offset = 0; + _end_offset = 0; + } +} /* namespace mongo */ Index: src/mongo/db/toku/cursor.h =================================================================== --- src/mongo/db/toku/cursor.h (revision 0) +++ src/mongo/db/toku/cursor.h (revision 47136) @@ -0,0 +1,146 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TOKUDB_CURSOR_H +#define TOKUDB_CURSOR_H + +#include + +#include "db/btree.h" + +#include "db/toku/invariant.h" +#include "db/toku/row_buffer.h" + +namespace mongo { + + // A TokuDB cursor extends a BtreeCursor's public interface. + // Internally, it uses a real tokudb cursor and it caches the + // last read row for speed. + class TokuDBCursor : public BtreeCursor { + public: + + // must use the above factor for a new tokudb cursor + TokuDBCursor(NamespaceDetails *nsd, int idxNo, const IndexDetails &idx); + + // create a cursor for a given range, given: + // - start, end keys + // - endKeyInclusve whether to fetch the end key. just assume true. caller can filter. + // - direciton: not sure + virtual void init(const BSONObj &startKey, const BSONObj &endKey, bool endKeyInclusive, int direction); + + // create a cursor, given + // - bounds: set of intervals ie (0, 1] that this cursor is valid over + // - singleIntervalLimit: sounds like we're only allowed to use one iterval + // - direction: not sure + virtual void init(const shared_ptr &bounds, int singleIntervalLimit, int direction); + + // useful function: return the associated diskloc for the current key + virtual DiskLoc currLoc(); + + // it appears this is only used by the btree cursor code for + // checkLocation(), which we don't support, so we do nothing here. + virtual BSONObj keyAt(int ofs) const; + + // get the current key for this cursor. it's probably okay to cache this + // for the lifetime of a call to advance(), but no later, since writes + // can happen in between calls to advance(). + virtual BSONObj currKey() const; + + // get the associated document for the cursor's current position. + // + // if we're using a clustered index, then we can return the current + // object for this cursor since we already have the data. + // + // if not, do it the way the btreecursor does - make a bsonobj by + // reading from the data file heap (potentially causes IO) + virtual BSONObj current(); + + // apparently only used in unit tests. unsupported. + virtual bool curKeyHasChild(); + + // returns: true if some keys were skipped, false otherwise. + // we don't need to skip keys because the tokudb cursor isn't + // going to give back stale data (ie: data that was deleted). + // the mongo cursor could so it uses this to skip them. + // we never skip so we always return false. + virtual bool skipUnusedKeys(); + + // called after a cursor recovers from a yield. that means some + // write operations could have occurred: cached data may be stale. + virtual void checkLocation(); + + // this sets keyOfs to something. we don't care. set to zero, do nothing. + void _advanceTo(DiskLoc &thisLoc, int &keyOfs, const BSONObj &keyBegin, + int keyBeginLen, bool afterKey, const vector &keyEnd, + const vector &keyEndInclusive, const Ordering &order, int direction); + + // move the cursor in the given direction. + // tokudb cursors do not care about the keyofs. so don't touch it. + // we should return a non null diskloc if we found the next key, + // or the null diskloc if we couldn't. + virtual DiskLoc _advance(const DiskLoc& thisLoc, int& keyOfs, int direction, const char *caller); + + // this doesn't look very interesting. unsupported. + virtual void _audit(); + + // this is only used for checkLocation and in a cursor init function + // that tries to set the current btree bucket, which is meaningless + // for a tokudb cursor. the only thing that matters is that we return + // the null recordloc if we can't find the key. + virtual DiskLoc _locate(const BSONObj& key, const DiskLoc& loc); + + // close the underlying tokudb cursor, if it exists + virtual ~TokuDBCursor(); + + private: + // tokudb DB cursor and associated txn + DBC *cursor; + DB_TXN *txn; + + // row buffer to cache rows read from the ydb layer, using bulk fetch + RowBuffer row_buffer; + + // initialize the tokudb cursor and position it over the start key + void init_internal(const BSONObj &startKey, const BSONObj &endKey); + + // set the cursor to the first key >= given key + // return: + // true, the cursor was set successfully, may proceed + // false, the cursor could not be set, no key >= given + bool set_cursor(const BSONObj &key); + + // bulk fetch from the cursor and store the rows into the buffer + // return: + // true, more rows were read from tokudb and stored + // false, no more rows to read from the cursor + bool bulk_fetch_more_rows(); + + // how many times did we bulk fetch into the buffer? this + // lets us figure out an appropriate amount of data to + // fetch next time. the idea is we don't want to fetch + // every single key between startKey and endKey into the + // buffer right away, but rather get exponentially more each + // time we need it. + int bulk_fetch_iteration; + + // is this cursor over a clustering index? if so, the document + // associated with each index row is stored with the row, so + // reads don't have to go to the main data file. + bool clustering; + }; +} + +#endif /* TOKUDB_CURSOR_H */ Index: src/mongo/db/toku/row_buffer.h =================================================================== --- src/mongo/db/toku/row_buffer.h (revision 0) +++ src/mongo/db/toku/row_buffer.h (revision 47136) @@ -0,0 +1,80 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TOKUDB_ROW_BUFFER_H +#define TOKUDB_ROW_BUFFER_H + +#include "db/pdfile.h" + +namespace mongo { + + class RowBuffer { + public: + RowBuffer(); + + ~RowBuffer(); + + // append the given key, loc and obj to the end of the buffer + void append(const BSONObj &key, const DiskLoc &loc, const BSONObj &obj); + + // the row buffer is gorged if its current size is greater + // than or equal to the preferred size. + // return: + // true, buffer is gorged + // false, buffer could fit more data + // rationale: + // - if true, then it makes more sense to empty and refill + // the buffer than trying to stuff more in it. + // - if false, then an append probably won't cause a realloc, + // so go ahead and do it. + bool isGorged() const; + + // move the internal buffer position to the next key, loc pair + // returns: + // true, the buffer is reading to be read via current() + // false, the buffer has no more data + bool next(); + + // get the current key from the buffer. + BSONObj currentKey() const; + + // get the current diskloc from the buffer. + DiskLoc currentLoc() const; + + // get the current obj from the buffer. + BSONObj currentObj() const; + + // empty the row buffer, resetting all data and internal positions + void empty(); + + private: + // store rows in a buffer that has a "preferred size". if we need to + // fit more in the buf, then it's okay to go over. _size captures the + // real size of the buffer. + // _end_offset is where we will write new bytes for append(). it is + // modified and advanced after the append. + // _current_offset is where we will read for current(). it is modified + // and advanced after a next() + static const size_t _BUF_SIZE_PREFERRED = 128 * 1024; + char *_buf; + size_t _size; + size_t _end_offset; + size_t _current_offset; + }; + +} /* namespace mongo */ + +#endif /* TOKUDB_ROW_BUFFER_H */ Index: src/mongo/db/toku/index.cpp =================================================================== --- src/mongo/db/toku/index.cpp (revision 0) +++ src/mongo/db/toku/index.cpp (revision 47136) @@ -0,0 +1,135 @@ +/** @file toku/index.cpp */ + +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "pch.h" + +#include + +#include "db/toku/env.h" +#include "db/toku/index.h" +#include "db/toku/cursor.h" +#include "db/toku/dbt-bson-inl.h" +#include "db/toku/invariant.h" + +#include "db/namespace_details.h" +#include "db/btree.h" + +namespace mongo { + + void dump_bson(BSONObj obj) { + std::cout << obj.toString() << std::endl; + } + + // drop the index in the environment + void IndexInterfaceTokuDB::dropIndex(const IndexDetails &idx) { + toku::env_drop_index(idx); + } + + struct cursor_getf_cb_extra { + BSONObj *obj; + DiskLoc *loc; + }; + + // return the diskloc associated with the given key in the index + DiskLoc IndexInterfaceTokuDB::findSingle(const IndexDetails &idx, + const DiskLoc &thisLoc, const BSONObj &key) const { + const char *ns = idx.parentNS().c_str(); + NamespaceDetails *nsd = nsindex(ns)->details(ns); + invariant(nsd); + TokuDBCursor cursor(nsd, nsd->idxNo(idx), idx); + // initialize a cursor over the interval [key, key] + // where endkeyinclusive = true and direction = 1 + cursor.init(key, key, true, 1); + DiskLoc loc = DiskLoc(); + if (cursor.ok() && cursor.currKey() == key) { + loc = cursor.currLoc(); + } + return loc; + } + + // stolen from btree.cpp + // generate an error string for a duplicate key in the given idx + static std::string dup_key_error_string(const IndexDetails &idx, + const BSONObj &key) { + std::stringstream ss; + ss << "E11000 duplicate key error "; + ss << "index: " << idx.indexNamespace() << " "; + ss << "dup key: " << key.toString(); + return ss.str(); + } + + + // index the given key, associating it with the recordLoc + // - not sure what toplevel is. + int IndexInterfaceTokuDB::insert(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj &key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, const BSONObj *obj, bool toplevel) const { + DB *db = toku::env_get_db_handle_by_idx(idx); + // get an index key that is (bson obj key, recordLoc) + char key_buf[toku::index_key_size(key)]; + DBT index_key = toku::generate_index_key(key_buf, key, recordLoc); + DBT val_dbt; + if (obj) { + // if we were given the object, then that's the value. + val_dbt = toku::init_dbt_from_bson_obj(*obj); + } else { + // if we weren't given the object, then store an empty one. + val_dbt = toku::init_dbt_from_bson_obj(BSONObj()); + } + if (!dupsAllowed) { + DiskLoc loc = findSingle(idx, thisLoc, key); + if (!loc.isNull()) { + // this is how mongo deals with dup key errors. + // they throw a user exception that says so. + const std::string error_string = dup_key_error_string(idx, key); + uasserted(ASSERT_ID_DUPKEY, error_string); + } + } + int r = db->put(db, NULL, &index_key, &val_dbt, 0); + invariant(r == 0); + return r; + } + + int IndexInterfaceTokuDB::bt_insert(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj &key, const Ordering &order, bool dupsAllowed, + IndexDetails &idx, bool toplevel) const { + // pass null for the obj so the insert does not cluster + return insert(thisLoc, recordLoc, key, order, dupsAllowed, idx, NULL, toplevel); + } + + // this version of bt_insert takes the full document, for clustering + int IndexInterfaceTokuDB::bt_insert_clustering(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj& key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, const BSONObj &obj, bool toplevel) const { + // pass the address of the object so the inserts clusters + return insert(thisLoc, recordLoc, key, order, dupsAllowed, idx, &obj, toplevel); + } + + // unindex the given key. the index key is bson obj key + recordloc + bool IndexInterfaceTokuDB::unindex(const DiskLoc thisLoc, IndexDetails &idx, + const BSONObj& key, const DiskLoc recordLoc) const { + DB *db = toku::env_get_db_handle_by_idx(idx); + // get an index key that is (bson obj key, recordLoc) + char key_buf[toku::index_key_size(key)]; + DBT index_key = toku::generate_index_key(key_buf, key, recordLoc); + const int flags = DB_DELETE_ANY; + int r = db->del(db, NULL, &index_key, flags); + invariant(r == 0); + return r == 0 ? true : false; + } +} /* namespace mongo */ Index: src/mongo/db/toku/env.cpp =================================================================== --- src/mongo/db/toku/env.cpp (revision 0) +++ src/mongo/db/toku/env.cpp (revision 47136) @@ -0,0 +1,284 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "pch.h" + +#include "db/toku/env.h" +#include "db/toku/dbt-bson-inl.h" +#include "db/toku/invariant.h" + +#include + +#include "db/namespace_details.h" +#include "db/cmdline.h" + +namespace toku { + + static DB_ENV *env; + static std::vector > tokudb_indexes; + + // we use txns and logging if dur/journaling was set on the command line + static bool using_txns(void) { + return mongo::cmdLine.dur ? true : false; + } + + // if using txns and logging, begin a txn for this cursor + // requires: environment is opened from a previous call to get a db + void env_maybe_begin_txn(DB_TXN **txn) { + invariant(env); + if (using_txns()) { + int r = env->txn_begin(env, NULL, txn, 0); + invariant(r == 0); + } else { + *txn = NULL; + } + } + + // if using txns and logging, commit the txn in the cursor + // requires: environment is opened from a previous call to get a db + void env_maybe_commit_txn(DB_TXN *txn) { + invariant(env); + if (using_txns()) { + int r = txn->commit(txn, 0); + invariant(r == 0); + } else { + invariant(txn == NULL); + } + } + + // for a given idx, whats the tokudb db name? + static std::string generate_idx_db_name(const mongo::IndexDetails &idx) { + // the index namespace has ns and index name, so it's unique + const std::string &ns = idx.indexNamespace(); + return ns; + } + + // tokudb environment comparison functions to compare + // two bson objects, converted from DBTs + static int comparison_function(DB *db, const DBT *key1, const DBT *key2) { + int c; + + // extract bson objects from each dbt and get the ordering + invariant(db->cmp_descriptor); + const DBT *key_pattern_dbt = &db->cmp_descriptor->dbt; + invariant(key_pattern_dbt->data); + const mongo::BSONObj key_pattern = init_bson_from_dbt(key_pattern_dbt); + const mongo::Ordering ordering = mongo::Ordering::make(key_pattern); + + // first compare by bson obj key + const mongo::BSONObj obj1 = init_bson_from_dbt(key1); + const mongo::BSONObj obj2 = init_bson_from_dbt(key2); + c = obj1.woCompare(obj2, ordering); + if (c < 0) { + return -1; + } else if (c > 0) { + return 1; + } + + // the bson objs must be equal, so compare by recordloc, + // which is stored after the bson object in the buffer + const char *buf1 = reinterpret_cast(key1->data) + obj1.objsize(); + const char *buf2 = reinterpret_cast(key2->data) + obj2.objsize(); + const mongo::DiskLoc *loc1 = reinterpret_cast(buf1); + const mongo::DiskLoc *loc2 = reinterpret_cast(buf2); + c = loc1->compare(*loc2); + if (c < 0) { + return -1; + } else if (c == 0) { + return 0; + } else { + return 1; + } + } + + static void open_env(void) { + int r; + r = db_env_create(&env, 0); + invariant(r == 0); + + env->set_errfile(env, stderr); + printf("tokudb: set errfile to stderr\n"); + + const char *errpfx = "[tokudb environment error]"; + env->set_errpfx(env, errpfx); + printf("tokudb: set errpfx to %s\n", errpfx); + + const int bytes = 0; + const int gigabytes = 8; + r = env->set_cachesize(env, gigabytes, bytes, 1); + invariant(r == 0); + printf("tokudb: cachesize set to %d GB + %d bytes\n", gigabytes, bytes); + + r = env->set_default_bt_compare(env, comparison_function); + invariant(r == 0); + + // tokudb on mongo can run with or without txn/logging. a single tokudb + // environment can also run with logging for a while, then without logging + // for a while, etc, without rebuilding the environment. the only catch + // here is I think you MUST perform a clean shutdown with logging enabled + // if you wish to subsequently run without logging, or else weird things + // can happen with the log the next time you try to run with logging. + const int log_flags = using_txns() ? DB_INIT_LOG | DB_INIT_TXN : 0; + const int flags = DB_CREATE | DB_PRIVATE | DB_RECOVER | log_flags; + boost::filesystem::path env_path = boost::filesystem::path(mongo::dbpath) / "tokudb"; + boost::filesystem::create_directory(env_path); + const char *env_dir = env_path.string().c_str(); + r = env->open(env, env_dir, flags, 0755); + invariant(r == 0); + printf("tokudb: environment opened at %s, logging %s\n", env_dir, + using_txns() ? "enabled" : "disabled"); + + const int checkpoint_period = 60; + r = env->checkpointing_set_period(env, checkpoint_period); + invariant(r == 0); + printf("tokudb: checkpoint period set to %d sec\n", checkpoint_period); + + const int cleaner_period = 2; + r = env->cleaner_set_period(env, cleaner_period); + invariant(r == 0); + printf("tokudb: cleaner period set to %d sec\n", cleaner_period); + + const int cleaner_iterations = 5; + r = env->cleaner_set_iterations(env, cleaner_iterations); + invariant(r == 0); + printf("tokudb: cleaner iterations set to %d\n", cleaner_iterations); + } + + // open the environment if it isn't already + static void maybe_open_env(void) { + if (env == NULL) { + open_env(); + } + invariant(env); + } + + // open a db and save the mapping (name -> DB handle) in the vector of indexes + static DB *open_db(const std::string &db_name) { + DB *db; + int r = db_create(&db, env, 0); + invariant(r == 0); + r = db->open(db, NULL, db_name.c_str(), NULL, DB_BTREE, DB_CREATE, 0644); + invariant(r == 0); + printf("tokudb: opened db named %s\n", db_name.c_str()); + tokudb_indexes.push_back(std::pair(db_name, db)); + return db; + } + + // close the db with the given name and remove it from the map + static void close_db(const std::string &db_name) { + DB *db = NULL; + std::vector >::iterator i; + for (i = tokudb_indexes.begin(); i != tokudb_indexes.end(); i++) { + if (i->first == db_name) { + db = i->second; + break; + } + } + invariant(db); + int r = db->close(db, 0); + invariant(r == 0); + printf("tokudb: closed db named %s\n", db_name.c_str()); + tokudb_indexes.erase(i); + } + + // search through the map of dbs for one with the given namespace and name + // return: + // - open db if it was found + // - NULL if no open db was found + static DB *get_db_by_name(const std::string &db_name) { + DB *db = NULL; + std::vector >::iterator i; + for (i = tokudb_indexes.begin(); i != tokudb_indexes.end(); i++) { + if (i->first == db_name) { + db = i->second; + break; + } + } + return db; + } + + // set a descriptor for the given dictionary. the descriptor is + // a serialization of an idx's keyPattern. + static void set_db_descriptor(DB *db, const mongo::IndexDetails &idx) { + DBT ordering_dbt; + ordering_dbt.data = (void *) idx.keyPattern().objdata(); + ordering_dbt.size = idx.keyPattern().objsize(); + ordering_dbt.ulen = idx.keyPattern().objsize(); + ordering_dbt.flags = DB_DBT_USERMEM; + //DB_TXN *txn; + //env_maybe_begin_txn(&txn); + const int flags = DB_UPDATE_CMP_DESCRIPTOR; + int r = db->change_descriptor(db, NULL /*txn*/, &ordering_dbt, flags); + //env_maybe_commit_txn(txn); + invariant(r == 0); + } + + // get and maybe open a DB handle for the given idx + // return: + // - a shared, open db handle for this idx. + DB *env_get_db_handle_by_idx(const mongo::IndexDetails &idx) { + maybe_open_env(); + // try to get an existing open db. this is fast so long + // the number of indexes is small, and it will be for now. + const std::string db_name = generate_idx_db_name(idx); + DB *db = get_db_by_name(db_name); + + // open the db for this index name if it isn't already + if (db == NULL) { + db = open_db(db_name); + set_db_descriptor(db, idx); + } + invariant(db); + return db; + } + + // TODO: this index should be 'closed' by the caller first, not here. + // drop the index by removing the db + void env_drop_index(const mongo::IndexDetails &idx) { + maybe_open_env(); + int r; + // see if there is an open handle and close it if so + const std::string &db_name = generate_idx_db_name(idx); + DB *db = get_db_by_name(db_name); + if (db) { + close_db(db_name); + } + // remove the dictionary from the environment + printf("tokudb: dropping db %s\n", db_name.c_str()); + r = env->dbremove(env, NULL, db_name.c_str(), NULL, 0); + // it's okay to drop an index that doesn't exist, only + // if we didn't have it open before this call. + invariant(r == 0 || (db == NULL && r == ENOENT)); + } + + // shutdown tokudb by closing all dictionaries and the env, if open. + void env_shutdown(void) { + maybe_open_env(); + int r; + invariant(env); + std::vector >::iterator i; + for (i = tokudb_indexes.begin(); i != tokudb_indexes.end(); i++) { + const std::string &db_name = i->first; + DB *db = i->second; + printf("tokudb: closing dictionary %s\n", db_name.c_str()); + r = db->close(db, 0); + invariant(r == 0); + } + printf("tokudb: closing environment\n"); + r = env->close(env, 0); + invariant(r == 0); + } + +} /* namespace toku */ Index: src/mongo/db/toku/index.h =================================================================== --- src/mongo/db/toku/index.h (revision 0) +++ src/mongo/db/toku/index.h (revision 47136) @@ -0,0 +1,110 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TOKUDB_INDEX_H +#define TOKUDB_INDEX_H + +#include "db/index.h" +#include "db/queryutil.h" + +#include "db/toku/invariant.h" + +namespace mongo { + +class IndexInterfaceTokuDB : public IndexInterface { +public: + + virtual void dropIndex(const IndexDetails &idx); + + virtual int keyCompare(const BSONObj &k1, const BSONObj &k2, const Ordering &ordering) { + return k1.woCompare(k2, ordering, false); + } + + IndexInsertionContinuation *beginInsertIntoIndex( + int idxNo, IndexDetails &_idx, + DiskLoc _recordLoc, const BSONObj &_key, + const Ordering& _order, bool dupsAllowed) { + + printf("IndexInterfaceTokudb: %s not supported\n", __FUNCTION__); + return NULL; + } + + virtual long long fullValidate(const DiskLoc& thisLoc, const BSONObj &order) { + printf("IndexInterfaceTokudb: %s not supported, doing nothing.\n", __FUNCTION__); + return true; + } + + virtual DiskLoc findSingle(const IndexDetails &indexdetails , const DiskLoc& thisLoc, const BSONObj& key) const; + + virtual bool unindex(const DiskLoc thisLoc, IndexDetails& id, const BSONObj& key, const DiskLoc recordLoc) const; + + virtual int bt_insert(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj& key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, bool toplevel = true) const; + + // this version of bt_insert takes the full document, for clustering + virtual int bt_insert_clustering(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj& key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, const BSONObj &obj, bool toplevel = true) const; + + virtual DiskLoc addBucket(const IndexDetails& id) { + printf("IndexInterfaceTokudb: %s not supported, returning empty diskloc.\n", __FUNCTION__); + return DiskLoc(); + } + + virtual void uassertIfDups(IndexDetails& idx, vector& addedKeys, DiskLoc head, DiskLoc self, const Ordering& ordering) { + printf("IndexInterfaceTokudb: %s not supported, doing nothing.\n", __FUNCTION__); + } + + // for geo: + // all unsupported for tokudb + virtual bool isUsed(DiskLoc thisLoc, int pos) { + printf("IndexInterfaceTokudb: %s not supported, returning false.\n", __FUNCTION__); + return false; + } + + virtual void keyAt(DiskLoc thisLoc, int pos, BSONObj& key, DiskLoc& recordLoc) { + printf("IndexInterfaceTokudb: %s not supported, returning empty bson and recordloc.\n", __FUNCTION__); + key = BSONObj(); + recordLoc = DiskLoc(); + } + + virtual BSONObj keyAt(DiskLoc thisLoc, int pos) { + printf("IndexInterfaceTokudb: %s not supported, returning empty bson.\n", __FUNCTION__); + return BSONObj(); + } + + virtual DiskLoc locate(const IndexDetails &idx , const DiskLoc& thisLoc, const BSONObj& key, const Ordering &order, + int& pos, bool& found, const DiskLoc &recordLoc, int direction=1) { + printf("IndexInterfaceTokudb: %s not supported, returning empty diskloc.\n", __FUNCTION__); + return DiskLoc(); + } + + virtual DiskLoc advance(const DiskLoc& thisLoc, int& keyOfs, int direction, const char *caller) { + printf("IndexInterfaceTokudb: %s not supported, returning empty diskloc.\n", __FUNCTION__); + return DiskLoc(); + } + +private: + // if obj != NULL, then cluster the object with the index row + int insert(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj& key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, const BSONObj *obj, bool toplevel = true) const; +}; + +} /* namespace mongo */ + +#endif /* TOKUDB_INDEX_H */ Index: src/mongo/db/toku/cursor.cpp =================================================================== --- src/mongo/db/toku/cursor.cpp (revision 0) +++ src/mongo/db/toku/cursor.cpp (revision 47136) @@ -0,0 +1,297 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "pch.h" + +#include "db/toku/env.h" +#include "db/toku/cursor.h" +#include "db/toku/dbt-bson-inl.h" +#include "db/toku/invariant.h" +#include "db/toku/row_buffer.h" + +namespace mongo { + + TokuDBCursor::TokuDBCursor(NamespaceDetails *nsd, int idxNo, const IndexDetails &idx) : + BtreeCursor(nsd, idxNo, idx), + cursor(NULL), + txn(NULL), + row_buffer(), + bulk_fetch_iteration(0) { + } + + // close the cursor and maybe commit the txn when the cursor is destroyed + TokuDBCursor::~TokuDBCursor() { + if (cursor) { + int r = cursor->c_close(cursor); + invariant(r == 0); + toku::env_maybe_commit_txn(txn); + } + } + + // if either key or recordloc are non null inside the get callback, + // set them to the values just read from the db + struct cursor_getf_cb_extra { + RowBuffer *row_buffer; + int rows_fetched; + int rows_to_fetch; + }; + + // how many rows should we fetch for a particular iteration? + // crude performance testing shows us that it is quite expensive + // to go into go into ydb layer and start bulk fetching. we should + // fetch exponentially more and more rows with each iteration so + // that we avoid too much ydb layer overhead without reading it + // all at once, which is too aggressive for queries that might + // not want all of it. + static int max_rows_to_fetch(int iteration) { + int rows_to_fetch = 1; + switch (iteration) { + case 0: + rows_to_fetch = 1; + break; + case 1: + rows_to_fetch = 64; + break; + case 2: + rows_to_fetch = 256; + break; + case 3: + rows_to_fetch = 1024; + break; + default: + rows_to_fetch = 4096; + break; + } + return rows_to_fetch; + } + + // ydb layer cursor callback + static int cursor_getf_cb(const DBT *index_key, const DBT *val, void *extra) { + int r = 0; + + // the cursor callback is called even if the desired + // key is not found. in that case, index_key == NULL + if (index_key) { + struct cursor_getf_cb_extra *info = (struct cursor_getf_cb_extra *) extra; + RowBuffer *row_buffer = info->row_buffer; + + // put this row into the row buffer unconditionally + BSONObj row_key = toku::init_bson_from_dbt(index_key); + DiskLoc row_loc = toku::init_diskloc_from_dbt(index_key); + BSONObj row_obj = toku::init_bson_from_dbt(val); + row_buffer->append(row_key, row_loc, row_obj); + info->rows_fetched++; + + // request more bulk fetching if we are allowed to fetch more rows + // and the row buffer is not too full. + if (info->rows_fetched < info->rows_to_fetch && !row_buffer->isGorged()) { + r = TOKUDB_CURSOR_CONTINUE; + } + } + + return r; + } + + // set the cursor to be the first key >= given key + // return: + // true, the cursor was set successfully, may proceed + // false, the cursor could not be set, no key >= given + // effect: + // if there is no such key, set the btree bucket to the null diskloc, + // so this cursor is marked as exhausted. + bool TokuDBCursor::set_cursor(const BSONObj &key) { + char key_buf[toku::index_key_size(startKey)]; + const mongo::DiskLoc zero_diskloc = mongo::DiskLoc(0, 0); + DBT index_key = toku::generate_index_key(key_buf, startKey, zero_diskloc); + + int rows_to_fetch = max_rows_to_fetch(bulk_fetch_iteration); + struct cursor_getf_cb_extra extra = { &row_buffer, 0, rows_to_fetch }; + int r = cursor->c_getf_set_range(cursor, 0, &index_key, cursor_getf_cb, &extra); + invariant(r == 0 || r == DB_NOTFOUND); + bulk_fetch_iteration++; + return r == 0 ? true : false; + } + + // bulk fetch from the cursor and store the rows into the buffer + // return: + // true, more rows were read from tokudb and stored + // false, no more rows to read from the cursor + bool TokuDBCursor::bulk_fetch_more_rows() { + //static Timer timer; + //timer.reset(); + int rows_to_fetch = max_rows_to_fetch(bulk_fetch_iteration); + struct cursor_getf_cb_extra extra = { &row_buffer, 0, rows_to_fetch }; + int r = cursor->c_getf_next(cursor, 0, cursor_getf_cb, &extra); + invariant(r == 0 || r == DB_NOTFOUND); + bulk_fetch_iteration++; + //printf("%s: finished, ret %d, iteration %d, _nscanned %lld, rows_fetched %d\n", + // __FUNCTION__, r, bulk_fetch_iteration, _nscanned, extra.rows_fetched); + //printf("%s: fetched %d rows in %llu usecs\n", + // __FUNCTION__, extra.rows_fetched, timer.micros()); + return r == 0 ? true : false; + } + + // initialize the tokudb cursor and position it over the start key + void TokuDBCursor::init_internal(const BSONObj &startKey, const BSONObj &endKey) { + // get a db, maybe a txn, and create a cursor over the db + DB *db = toku::env_get_db_handle_by_idx(indexDetails); + toku::env_maybe_begin_txn(&txn); + int r = db->cursor(db, txn, &cursor, 0); + invariant(r == 0); + + // see if this cursor is over a clustering index + clustering = indexDetails.info.obj()["clustering"].trueValue(); + + // TODO: prelock range from startKey to endKey to get prefetching + // position the cursor over the first key >= startKey + bool ok = set_cursor(startKey); + bucket = ok ? minDiskLoc : DiskLoc(); + } + + // create cursor, given + // - start, end key range + // - ignore the others + void TokuDBCursor::init(const BSONObj &startKey, const BSONObj &endKey, bool endKeyInclusive, int direction) { + // for some reason we're not allowed to do this in the constructor + BtreeCursor::init(startKey, endKey, endKeyInclusive, direction); + init_internal(startKey, endKey); + invariant(cursor); + } + + // create a cursor, given + // - bounds: set of intervals ie (0, 1] that this cursor is valid over + // - singleIntervalLimit: sounds like we're only allowed to use one iterval + // - direction: do we iterate forward (using next) or backward (using prev) + void TokuDBCursor::init(const shared_ptr &bounds, int singleIntervalLimit, int direction) { + // for some reason we're not allowed to do this in the constructor + BtreeCursor::init(bounds,singleIntervalLimit,direction ); + init_internal(startKey, endKey); + invariant(cursor); + } + + // useful function: return the associated diskloc for the current key + DiskLoc TokuDBCursor::currLoc() { + DiskLoc loc = row_buffer.currentLoc(); + return loc; + } + + // get the associated document for the cursor's current position. + // + // if we're using a clustered index, then we can return the current + // object for this cursor since we already have the data. + // + // if not, do it the way the btreecursor does - make a bsonobj by + // reading from the data file heap (potentially causes IO) + BSONObj TokuDBCursor::current() { + BSONObj obj; + obj = row_buffer.currentObj(); + if (clustering) { + // clustering indexes should store a non-empty obj + invariant(!obj.isEmpty()); + } else { + // non-clustering indexes should store an empty obj + invariant(obj.isEmpty()); + obj = BSONObj::make(_current()); + } + return obj; + } + + // it appears this is only used by the btree cursor code for + // checkLocation(), which we don't support, so we do nothing here. + BSONObj TokuDBCursor::keyAt(int ofs) const { + return BSONObj(); + } + + // get the current key for this cursor. it's probably okay to cache this + // for the lifetime of a call to advance(), but no later, since writes + // can happen in between calls to advance(). + BSONObj TokuDBCursor::currKey() const { + BSONObj obj = row_buffer.currentKey(); + return obj; + } + + // apparently only used in unit tests. unsupported. + bool TokuDBCursor::curKeyHasChild() { + return false; + } + + // returns: true if some keys were skipped, false otherwise. + // we don't need to skip keys because the tokudb cursor isn't + // going to give back stale data (ie: data that was deleted). + // the mongo cursor could so it uses this to skip them. + // we never skip so we always return false. + bool TokuDBCursor::skipUnusedKeys() { + return false; + } + + // called after a cursor recovers from a yield. that means some + // write operations could have occurred: cached data may be stale. + // + // after decoding the btreecursor's implementation of checkLocation, + // the following invariants must be held after this call: + // - if the key we used to be on was deleted, we've moved forward. + // - if the key we used to be on still exists, it's our current. + // if we just reposition the toku cursor over the first key >= the + // old current key, we satisfy both invariants, because we won't + // get any row just deleted and we'll find the old key if its there. + void TokuDBCursor::checkLocation() { + // empty out the stale row buffer data and reposition the cursor + // using the old current key. if we couldn't reposition, the cursor + // must be exhausted and we need to invalidate it. + BSONObj old_current_key = row_buffer.currentKey(); + row_buffer.empty(); + bool ok = set_cursor(old_current_key); + bucket = ok ? minDiskLoc : DiskLoc(); + } + + // + // BtreeCursor specific stuff declared pure virtual + // + + // this sets keyOfs to something. we don't care. set to zero, do nothing. + void TokuDBCursor::_advanceTo(DiskLoc &thisLoc, int &keyOfs, const BSONObj &keyBegin, + int keyBeginLen, bool afterKey, const vector< const BSONElement * > &keyEnd, + const vector< bool > &keyEndInclusive, const Ordering &order, int direction ) { + } + + // move the cursor in the given direction. + // tokudb cursors do not care about the keyofs. so don't touch it. + // we should return a non null diskloc if we found the next key, + // or the null diskloc if we couldn't. + DiskLoc TokuDBCursor::_advance(const DiskLoc& thisLoc, int& keyOfs, int direction, const char *caller) { + invariant(cursor); + // try to move the row buffer to the next row + bool ok = row_buffer.next(); + if (!ok) { + // no more rows in the buffer. empty it and try to get more. + // if we can't, set bucket = null diskloc, invalidating it. + row_buffer.empty(); + ok = bulk_fetch_more_rows(); + } + return ok ? minDiskLoc : DiskLoc(); + } + + // this doesn't look very interesting. unsupported. + void TokuDBCursor::_audit() { + } + + // look for an exact key, loc match in the index. if not found, + // return an invalid diskloc. the caller will invalidate the cursor. + DiskLoc TokuDBCursor::_locate(const BSONObj& key, const DiskLoc& loc) { + bool ok = set_cursor(key); + return ok && currLoc() == loc ? minDiskLoc : DiskLoc(); + } +} /* namespace mongo */ Index: src/mongo/db/toku/dbt-bson-inl.h =================================================================== --- src/mongo/db/toku/dbt-bson-inl.h (revision 0) +++ src/mongo/db/toku/dbt-bson-inl.h (revision 47136) @@ -0,0 +1,92 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include + +#include "bson/bsonobj.h" + +#define UNUSED __attribute__((__unused__)) + +namespace toku { + +// index keys are big enough to fit the bson object plus a diskloc +UNUSED +static size_t index_key_size(const mongo::BSONObj &obj) { + return obj.objsize() + sizeof(mongo::DiskLoc); +} + +// generate an index key dbt for the given bson obj and diskloc. +// the buf must be index_key_size(obj) so everything fits. +UNUSED +static DBT generate_index_key(char *key_buf, const mongo::BSONObj &obj, const mongo::DiskLoc &loc) { + DBT index_key; + index_key.data = key_buf; + index_key.size = index_key_size(obj); + index_key.ulen = index_key_size(obj); + index_key.flags = DB_DBT_USERMEM; + // copy the object and diskloc into the dbt's buf + memcpy(key_buf, obj.objdata(), obj.objsize()); + memcpy(key_buf + obj.objsize(), &loc, sizeof(mongo::DiskLoc)); + return index_key; +} + +// get a dbt whose data buffer and size are borrowed from the given bsonobj +UNUSED +static DBT init_dbt_from_bson_obj(const mongo::BSONObj &obj) { + DBT dbt; + dbt.data = const_cast(obj.objdata()); + dbt.size = obj.objsize(); + dbt.ulen = obj.objsize(); + dbt.flags = DB_DBT_USERMEM; + return dbt; +} + +// create a bson object from an index key. the index key contains the +// bson object followed by the diskloc. if we initialize the object's +// internal buffer to the index key's buffer, it will only 'see' the +// bson object and ignore that the recordloc is afterwards. +UNUSED +static inline mongo::BSONObj init_bson_from_dbt(const DBT *index_key) { + const char *bson_buf = reinterpret_cast(index_key->data); + mongo::BSONObj obj = mongo::BSONObj(bson_buf); + return obj; +} + +// create a diskloc object from the given index key +// the recordloc lives at the end of the index key buffer +UNUSED +static inline mongo::DiskLoc init_diskloc_from_dbt(const DBT *index_key) { + mongo::DiskLoc loc; + size_t size = sizeof(mongo::DiskLoc); + const char *buf = reinterpret_cast(index_key->data); + memcpy(&loc, buf + index_key->size - size, size); + return loc; +} + +// dump the contents of a dbt in a non-compact, visually pleasing way +UNUSED +static inline void dump_dbt(const char *name, const DBT *dbt) { + printf("%s: {\n", name); + printf(" size = %d,\n", dbt->size); + printf(" data = ["); + for (uint32_t i = 0; i < dbt->size; i++) { + printf("%3u ", ((unsigned char *)dbt->data)[i]); + } + printf("]\n"); + printf("}\n"); +} + +} /* namespace toku */ Index: src/mongo/db/toku/env.h =================================================================== --- src/mongo/db/toku/env.h (revision 0) +++ src/mongo/db/toku/env.h (revision 47136) @@ -0,0 +1,44 @@ +/** +* Copyright (C) 2012 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TOKUDB_ENV_H +#define TOKUDB_ENV_H + +#include + +#include "db/index.h" +#include "db/toku/invariant.h" + +namespace toku { + // if using txns and logging, begin the txn. else, set it to NULL + void env_maybe_begin_txn(DB_TXN **txn); + + // if using txns and logging, commit the txn, else do nothing. + void env_maybe_commit_txn(DB_TXN *txn); + + // given an idx, get a shared open DB handle + // effect: creates db if it didn't exist + DB *env_get_db_handle_by_idx(const mongo::IndexDetails &idx); + + // given an idx, drop the associated db from the environment + void env_drop_index(const mongo::IndexDetails &idx); + + // shutdown by closing all dictionaries and the environment + void env_shutdown(void); + +} /* namespace toku */ + +#endif /* TOKUDB_ENV_H */ Index: src/mongo/db/index_update.cpp =================================================================== --- src/mongo/db/index_update.cpp (revision 46272) +++ src/mongo/db/index_update.cpp (revision 47136) @@ -109,11 +109,45 @@ } } + static void addKeysToIndex(const char *ns, NamespaceDetails *d, int idxNo, BSONObj& obj, + DiskLoc recordLoc, bool dupsAllowed); + + // tokudb: this is oldIndexRecord__notused. + // it doesn't do a search before inserting so it's what we need for tokudb indexes + // some comments in pdfile.cpp say that the old version is possibly just as fast + // as indexRecordUsingTwoSteps even on mongodb's btree, so we'll just use this all the time. + static void indexRecordWithoutReading(const char *ns, NamespaceDetails *d, BSONObj obj, DiskLoc loc) { + int n = d->nIndexesBeingBuilt(); + for ( int i = 0; i < n; i++ ) { + try { + bool unique = d->idx(i).unique(); + addKeysToIndex(ns, d, i, obj, loc, /*dupsAllowed*/!unique); + } + catch( DBException& ) { + /* try to roll back previously added index entries + note <= i (not < i) is important here as the index we were just attempted + may be multikey and require some cleanup. + */ + for( int j = 0; j <= i; j++ ) { + try { + _unindexRecord(d->idx(j), obj, loc, false); + } + catch(...) { + log(3) << "unindex fails on rollback after unique failure\n"; + } + } + throw; + } + } + } + /** add index keys for a newly inserted record done in two steps/phases to allow potential deferal of write lock portion in the future */ void indexRecordUsingTwoSteps(const char *ns, NamespaceDetails *d, BSONObj obj, DiskLoc loc, bool shouldBeUnlocked) { + return indexRecordWithoutReading(ns, d, obj, loc); +#if 0 vector multi; vector multiKeys; @@ -169,6 +203,7 @@ } } } +#endif } /* add keys to index idxNo for a new record */ @@ -189,7 +224,13 @@ } verify( !recordLoc.isNull() ); try { - ii.bt_insert(idx.head, recordLoc, *i, ordering, dupsAllowed, idx); + if (idx.info.obj()["clustering"].trueValue()) { + // tokudb: call the clustering version. if the underlying index doesn't + // support clustering indexes, the interface defaults to regular bt_insert + ii.bt_insert_clustering(idx.head, recordLoc, *i, ordering, dupsAllowed, idx, obj); + } else { + ii.bt_insert(idx.head, recordLoc, *i, ordering, dupsAllowed, idx); + } } catch (AssertionException& e) { if( e.getCode() == 10287 && idxNo == d->nIndexes ) { @@ -207,6 +248,38 @@ SortPhaseOne *precalced = 0; + // tokudb: build an index using plain old insertions + void buildIndexUsingInsertions(bool dupsAllowed, IndexDetails& idx, BSONObjExternalSorter& sorter, + bool dropDups, set &dupsToDrop, CurOp * op, SortPhaseOne *phase1, ProgressMeterHolder &pm, + Timer& t + ) + { + BSONObj keyLast; + auto_ptr i = sorter.iterator(); + verify( pm == op->setMessage( "index: building index using regular insertions" , phase1->nkeys , 10 ) ); + while( i->more() ) { + RARELY killCurrentOp.checkForInterrupt(); + BSONObjExternalSorter::Data d = i->next(); + const BSONObj key = d.first; + DiskLoc loc = d.second; + Ordering ordering = Ordering::make(idx.keyPattern()); + int r = 0; + if (idx.info.obj()["clustering"].trueValue()) { + // tokudb: call the clustering version. if the underlying index doesn't + // support clustering indexes, the interface defaults to regular bt_insert + BSONObj obj = loc.obj(); // this gives us the document at the diskloc + r = idx.idxInterface().bt_insert_clustering(DiskLoc(), loc, key, ordering, true, idx, obj); + } else { + r = idx.idxInterface().bt_insert(DiskLoc(), loc, key, ordering, true, idx); + } + if (r != 0) { + problem() << " got error code " << r << " while building and index with insertions\n"; + } + pm.hit(); + } + pm.finished(); + } + template< class V > void buildBottomUpPhases2And3(bool dupsAllowed, IndexDetails& idx, BSONObjExternalSorter& sorter, bool dropDups, set &dupsToDrop, CurOp * op, SortPhaseOne *phase1, ProgressMeterHolder &pm, @@ -318,6 +391,9 @@ buildBottomUpPhases2And3(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t); else if( idx.version() == 1 ) buildBottomUpPhases2And3(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t); + else if( idx.version() == 2 ) + // tokudb: to build our index, just trickle load rows using insertions + buildIndexUsingInsertions(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t); else verify(false); @@ -491,7 +567,10 @@ if( inDBRepair || !background ) { n = fastBuildIndex(ns.c_str(), d, idx, idxNo); - verify( !idx.head.isNull() ); + // tokudb: we don't care if idx.head is null, so only verify if idx is not v2 + if (idx.version() != 2) { + verify( !idx.head.isNull() ); + } } else { BackgroundIndexBuildJob j(ns.c_str()); Index: src/mongo/db/instance.cpp =================================================================== --- src/mongo/db/instance.cpp (revision 46272) +++ src/mongo/db/instance.cpp (revision 47136) @@ -52,7 +52,9 @@ #include #include "dur_commitjob.h" #include "mongo/db/commands/fsync.h" +#include "index.h" + namespace mongo { // for diaglog @@ -995,6 +997,9 @@ MemoryMappedFile::closeAllFiles( ss3 ); log() << ss3.str() << endl; + log() << "shutdown: shutting down the index interface..." << endl; + IndexInterface::shutdown(); + if( cmdLine.dur ) { dur::journalCleanup(true); } Index: src/mongo/db/index.h =================================================================== --- src/mongo/db/index.h (revision 46272) +++ src/mongo/db/index.h (revision 47136) @@ -35,6 +35,14 @@ protected: virtual ~IndexInterface() { } public: + + // tokudb: shut down the environment. used when the server shuts down + static void shutdown(void); + + // tokudb: remove an index from the environment. used during kill_idx() + // so the environment can properly cleanup when mongodb drops an index. + virtual void dropIndex(const IndexDetails &idx) { } + class IndexInserter : private boost::noncopyable { public: IndexInserter(); @@ -59,6 +67,12 @@ virtual int bt_insert(const DiskLoc thisLoc, const DiskLoc recordLoc, const BSONObj& key, const Ordering &order, bool dupsAllowed, IndexDetails& idx, bool toplevel = true) const = 0; + // tokudb: bt_insert interface for clustering keys. defaults to regular bt_insert + virtual int bt_insert_clustering(const DiskLoc thisLoc, const DiskLoc recordLoc, + const BSONObj& key, const Ordering &order, bool dupsAllowed, + IndexDetails& idx, const BSONObj &obj, bool toplevel = true) const { + return bt_insert(thisLoc, recordLoc, key, order, dupsAllowed, idx, toplevel); + } virtual DiskLoc addBucket(const IndexDetails&) = 0; virtual void uassertIfDups(IndexDetails& idx, vector& addedKeys, DiskLoc head, DiskLoc self, const Ordering& ordering) = 0; @@ -219,7 +233,8 @@ it may not mean we can build the index version in question: we may not maintain building of indexes in old formats in the future. */ - static bool isASupportedIndexVersionNumber(int v) { return (v&1)==v; } // v == 0 || v == 1 + // tokudb: tokudb indexes are version 2 + static bool isASupportedIndexVersionNumber(int v) { return (v&3)==v; } // v == 0 || v == 1 || v == /** @return the interface for this interface, which varies with the index version. used for backward compatibility of index versions/formats. @@ -227,7 +242,7 @@ IndexInterface& idxInterface() const { int v = version(); dassert( isASupportedIndexVersionNumber(v) ); - return *iis[v&1]; + return *iis[v&3]; } static IndexInterface *iis[]; Index: src/mongo/db/pdfile.cpp =================================================================== --- src/mongo/db/pdfile.cpp (revision 46272) +++ src/mongo/db/pdfile.cpp (revision 47136) @@ -1095,6 +1095,9 @@ d->paddingFits(); /* have any index keys changed? */ + // tokudb: mark which indexed have been updated, so we don't waste + // time doing a double update on a clustering index below. + vector index_updated(d->nIndexesBeingBuilt()); { int keyUpdates = 0; int z = d->nIndexesBeingBuilt(); @@ -1118,12 +1121,21 @@ BSONObj idxKey = idx.info.obj().getObjectField("key"); Ordering ordering = Ordering::make(idxKey); keyUpdates += changes[x].added.size(); + index_updated[x] = false; for ( unsigned i = 0; i < changes[x].added.size(); i++ ) { try { /* we did the dupCheck() above. so we don't have to worry about it here. */ - ii.bt_insert( - idx.head, - dl, *changes[x].added[i], ordering, /*dupsAllowed*/true, idx); + // tokudb: if this is clustering, then pass the new object with the insert + index_updated[x] = true; + if (idx.info.obj()["clustering"].trueValue()) { + ii.bt_insert_clustering( + idx.head, + dl, *changes[x].added[i], ordering, /*dupsAllowed*/true, idx, objNew); + } else { + ii.bt_insert( + idx.head, + dl, *changes[x].added[i], ordering, /*dupsAllowed*/true, idx); + } } catch (AssertionException& e) { debug.extra << " exception update index "; @@ -1135,6 +1147,32 @@ debug.keyUpdates = keyUpdates; } + // for every clustering index over this collection, make sure we insert + // the new object as the clustered data. we can get the necessary keys + // using the IndexSpec and extracting the keys from the new object. + // + // this makes update({"a":1}, {$inc:{"b":1}}) work when there is + // a clustering index on "a". normally the "a" index would not + // get any updates about a change to "b", but it needs one here. + // + // we need to check if the index was not already updated though, otherwise + // we'd do duplicate work for something like update({"a":1}, {$inc:{"a":1}}). + for (int i = 0; i < d->nIndexesBeingBuilt(); i++) { + IndexDetails &other_idx = d->idx(i); + // update the clustered value only if it wasn't updated already + if (other_idx.info.obj()["clustering"].trueValue() && !index_updated[i]) { + BSONObjSet key_set; + other_idx.getKeysFromObject(objNew, key_set); + for (BSONObjSet::iterator key_i = key_set.begin(); key_i != key_set.end(); key_i++) { + BSONObj idxKey = other_idx.info.obj().getObjectField("key"); + Ordering ordering = Ordering::make(idxKey); + other_idx.idxInterface().bt_insert_clustering( + other_idx.head, + dl, *key_i, ordering, /*dupsAllowed*/ true, other_idx, objNew); + } + } + } + // update in place int sz = objNew.objsize(); memcpy(getDur().writingPtr(toupdate->data(), sz), objNew.objdata(), sz); Index: src/mongo/db/index.cpp =================================================================== --- src/mongo/db/index.cpp (revision 46272) +++ src/mongo/db/index.cpp (revision 47136) @@ -27,9 +27,15 @@ #include "ops/delete.h" #include "mongo/util/scopeguard.h" - +#include "db/toku/env.h" +#include "db/toku/index.h" namespace mongo { + // tokudb: shutdown the environment + void IndexInterface::shutdown() { + toku::env_shutdown(); + } + IndexInterface::IndexInserter::IndexInserter() {} IndexInterface::IndexInserter::~IndexInserter() { for (size_t i = 0; i < _continuations.size(); ++i) @@ -136,8 +142,9 @@ IndexInterfaceImpl iii_v0; IndexInterfaceImpl iii_v1; + IndexInterfaceTokuDB iii_tokudb; - IndexInterface *IndexDetails::iis[] = { &iii_v0, &iii_v1 }; + IndexInterface *IndexDetails::iis[] = { &iii_v0, &iii_v1, &iii_tokudb }; int removeFromSysIndexes(const char *ns, const char *idxName) { string system_indexes = cc().database()->name + ".system.indexes"; @@ -197,6 +204,9 @@ string name = indexName(); + // tokudb: ensure the db is dropped in the environment using dropIndex + idxInterface().dropIndex(*this); + /* important to catch exception here so we can finish cleanup below. */ try { dropNS(ns.c_str()); @@ -388,7 +398,7 @@ // note (one day) we may be able to fresh build less versions than we can use // isASupportedIndexVersionNumber() is what we can use uassert(14803, str::stream() << "this version of mongod cannot build new indexes of version number " << vv, - vv == 0 || vv == 1); + vv == 0 || vv == 1 || vv == 2); v = (int) vv; } // idea is to put things we use a lot earlier Index: src/mongo/SConscript =================================================================== --- src/mongo/SConscript (revision 46272) +++ src/mongo/SConscript (revision 47136) @@ -290,6 +290,12 @@ "db/dbcommands.cpp", "db/dbcommands_admin.cpp", + # tokudb + "db/toku/env.cpp", + "db/toku/index.cpp", + "db/toku/cursor.cpp", + "db/toku/row_buffer.cpp", + # most commands are only for mongod "db/commands/fsync.cpp", "db/commands/distinct.cpp", Index: SConstruct =================================================================== --- SConstruct (revision 46272) +++ SConstruct (revision 47136) @@ -769,6 +769,11 @@ env.Append( CPPPATH=['$EXTRACPPPATH'], LIBPATH=['$EXTRALIBPATH'] ) +# tokudb +env.Append(CPPPATH=['/home/esmet/tokudb/release/include']) +env.Append(LIBPATH=['/home/esmet/tokudb/release/lib']) +env.Append(LIBS=['libtokudb_static.a', 'libtokuportability_static.a', 'dl', 'z']) + # --- check system --- def doConfigure(myenv):