Commit 7f0b3f54 authored by nkindlon's avatar nkindlon
Browse files

First check-in for multi-sweep

parent ccd125c1
......@@ -3,8 +3,7 @@
BinTree::BinTree(ContextIntersect *context)
: _databaseFile(NULL),
_context(context),
: _context(context),
_binOffsetsExtended(NULL),
_showBinMetrics(false),
_maxBinNumFound(0)
......@@ -36,7 +35,7 @@ BinTree::~BinTree() {
}
for (innerListIterType listIter = bin->begin(); listIter != bin->end(); listIter = bin->next()) {
const Record *record = listIter->value();
_databaseFile->deleteRecord(record);
_context->getFile(record->getFileIdx())->deleteRecord(record);
}
delete bin;
bin = NULL;
......@@ -70,20 +69,22 @@ BinTree::~BinTree() {
void BinTree::loadDB()
{
_databaseFile = _context->getFile(_context->getDatabaseFileIdx());
Record *record = NULL;
while (!_databaseFile->eof()) {
record = _databaseFile->getNextRecord();
//In addition to NULL records, we also don't want to add unmapped reads.
if (record == NULL || record->isUnmapped()) {
continue;
}
for (int i=0; i < _context->getNumDatabaseFiles(); i++) {
FileRecordMgr *databaseFile = _context->getDatabaseFile(i);
Record *record = NULL;
while (!databaseFile->eof()) {
record = databaseFile->getNextRecord();
//In addition to NULL records, we also don't want to add unmapped reads.
if (record == NULL || record->isUnmapped()) {
continue;
}
if (!addRecordToTree(record)) {
fprintf(stderr, "ERROR: Unable to add record to tree.\n");
_databaseFile->close();
exit(1);
if (!addRecordToTree(record)) {
fprintf(stderr, "ERROR: Unable to add record to tree.\n");
databaseFile->close();
exit(1);
}
}
}
}
......
......@@ -32,7 +32,6 @@ public:
private:
FileRecordMgr *_databaseFile;
ContextIntersect *_context;
//
......
......@@ -41,7 +41,6 @@ ContextBase::ContextBase()
_printable(true),
_explicitBedOutput(false),
_queryFileIdx(-1),
_databaseFileIdx(-1),
_bamHeaderAndRefIdx(-1),
_maxNumDatabaseFields(0),
_useFullBamTags(false),
......@@ -210,7 +209,11 @@ bool ContextBase::isValidState()
return false;
}
if (hasColumnOpsMethods()) {
FileRecordMgr *dbFile = getFile(hasIntersectMethods() ? _databaseFileIdx : 0);
//TBD: Adjust column ops for multiple databases.
//For now, use last file.
// FileRecordMgr *dbFile = getFile(hasIntersectMethods() ? _databaseFileIdx : 0);
FileRecordMgr *dbFile = getFile(getNumInputFiles()-1);
_keyListOps->setDBfileType(dbFile->getFileType());
if (!_keyListOps->isValidColumnOps(dbFile)) {
return false;
......@@ -251,7 +254,7 @@ bool ContextBase::openFiles() {
_files.resize(_fileNames.size());
for (int i = 0; i < (int)_fileNames.size(); i++) {
FileRecordMgr *frm = getNewFRM(_fileNames[i]);
FileRecordMgr *frm = getNewFRM(_fileNames[i], i);
if (hasGenomeFile()) {
frm->setGenomeFile(_genomeFile);
}
......@@ -281,7 +284,7 @@ int ContextBase::getBamHeaderAndRefIdx() {
if (_files[_queryFileIdx]->getFileType() == FileRecordTypeChecker::BAM_FILE_TYPE) {
_bamHeaderAndRefIdx = _queryFileIdx;
} else {
_bamHeaderAndRefIdx = _databaseFileIdx;
_bamHeaderAndRefIdx = _dbFileIdxs[0];
}
return _bamHeaderAndRefIdx;
}
......@@ -508,13 +511,17 @@ const QuickString &ContextBase::getColumnOpsVal(RecordKeyList &keyList) const {
return _keyListOps->getOpVals(keyList);
}
FileRecordMgr *ContextBase::getNewFRM(const QuickString &filename) {
if (!_useMergedIntervals) {
return new FileRecordMgr(filename);
} else {
FileRecordMgr *ContextBase::getNewFRM(const QuickString &filename, int fileIdx) {
if (_useMergedIntervals) {
FileRecordMergeMgr *frm = new FileRecordMergeMgr(filename);
frm->setStrandType(_desiredStrand);
frm->setMaxDistance(_maxDistance);
frm->setFileIdx(fileIdx);
return frm;
} else {
FileRecordMgr *frm = new FileRecordMgr(filename);
frm->setFileIdx(fileIdx);
return frm;
}
}
......
......@@ -196,7 +196,7 @@ protected:
bool _printable;
bool _explicitBedOutput;
int _queryFileIdx;
int _databaseFileIdx;
vector<int> _dbFileIdxs;
int _bamHeaderAndRefIdx;
int _maxNumDatabaseFields;
bool _useFullBamTags;
......@@ -227,7 +227,7 @@ protected:
bool isUsed(int i) const { return _argsProcessed[i]; }
bool cmdArgsValid();
bool openFiles();
virtual FileRecordMgr *getNewFRM(const QuickString &filename);
virtual FileRecordMgr *getNewFRM(const QuickString &filename, int fileIdx);
//set cmd line params and counter, i, as members so code
//is more readable (as opposed to passing all 3 everywhere).
......
......@@ -92,7 +92,7 @@ bool ContextIntersect::isValidState()
return false;
}
if (_queryFileIdx == -1 || _databaseFileIdx == -1) {
if (_queryFileIdx == -1 || _dbFileIdxs.size() == -0) {
_errorMsg = "\n***** ERROR: query and database files not specified. *****";
return false;
}
......@@ -149,7 +149,7 @@ bool ContextIntersect::isValidState()
if (getAnyHit() || getNoHit() || getWriteCount()) {
setPrintable(false);
}
if (_files.size() != 2 ) {
if (_files.size() < 2 ) {
return false;
}
return true;
......@@ -161,8 +161,8 @@ bool ContextIntersect::determineOutputType() {
}
//determine the maximum number of database fields.
for (int i=0; i < (int)_files.size(); i++) {
int numFields = _files[i]->getNumFields();
for (int i=0; i < getNumDatabaseFiles(); i++) {
int numFields = getDatabaseFile(i)->getNumFields();
if ( numFields > _maxNumDatabaseFields) {
_maxNumDatabaseFields = numFields;
}
......@@ -215,11 +215,14 @@ bool ContextIntersect::handle_b()
return false;
}
addInputFile(_argv[_i+1]);
_databaseFileIdx = getNumInputFiles() -1;
markUsed(_i - _skipFirstArgs);
_i++;
markUsed(_i - _skipFirstArgs);
do {
addInputFile(_argv[_i+1]);
int dbFileIdx = getNumInputFiles() -1;
_dbFileIdxs.push_back(dbFileIdx);
markUsed(_i - _skipFirstArgs);
_i++;
markUsed(_i - _skipFirstArgs);
} while (_argc > _i+1 && _argv[_i+1][0] != '-');
return true;
}
......
......@@ -22,17 +22,17 @@ public:
//NOTE: Query and database files will only be marked as such by either the
//parseCmdArgs method, or by explicitly setting them.
FileRecordMgr *getQueryFile() { return getFile(_queryFileIdx); }
FileRecordMgr *getDatabaseFile() { return getFile(_databaseFileIdx); }
FileRecordMgr *getDatabaseFile(int idx) { return getFile(_dbFileIdxs[idx]); }
int getQueryFileIdx() const { return _queryFileIdx; }
void setQueryFileIdx(int idx) { _queryFileIdx = idx; }
int getDatabaseFileIdx() const { return _databaseFileIdx; }
void setDatabaseFileIdx(int idx) { _databaseFileIdx = idx; }
int getNumDatabaseFiles() { return (int)_dbFileIdxs.size(); }
const vector<int> &getDbFileIdxs() const { return _dbFileIdxs; }
const QuickString &getQueryFileName() const { return _files[_queryFileIdx]->getFileName(); }
const QuickString &getDatabaseFileName() const { return _files[_databaseFileIdx]->getFileName(); }
const QuickString &getDatabaseFileName(int idx) const { return _files[_dbFileIdxs[idx]]->getFileName(); }
ContextFileType getQueryFileType() const { return _files[_queryFileIdx]->getFileType(); }
ContextFileType getDatabaseFileType() const { return _files[_databaseFileIdx]->getFileType(); }
ContextFileType getDatabaseFileType(int idx) const { return _files[_dbFileIdxs[idx]]->getFileType(); }
ContextRecordType getQueryRecordType() const { return _files[_queryFileIdx]->getRecordType(); }
ContextRecordType getDatabaseRecordType() const { return _files[_databaseFileIdx]->getRecordType(); }
ContextRecordType getDatabaseRecordType(int idx) const { return _files[_dbFileIdxs[idx]]->getRecordType(); }
int getMaxNumDatabaseFields() const { return _maxNumDatabaseFields; }
void setMaxNumDatabaseFields(int val) { _maxNumDatabaseFields = val; }
......
......@@ -4,7 +4,7 @@
#include "BufferedStreamMgr.h"
FileReader::FileReader()
:
: _fileIdx(-1),
_bufStreamMgr(NULL),
_isFileOpen(false),
_currChromId(-1)
......
......@@ -16,6 +16,9 @@ public:
FileReader();
virtual ~FileReader();
void setFileName(const string &filename) { _filename = filename; }
virtual int getFileIdx() const { return _fileIdx; }
virtual void setFileIdx(int fileIdx) { _fileIdx = fileIdx; }
void setInputStream(BufferedStreamMgr *bufStreamMgr) {
_bufStreamMgr = bufStreamMgr;
_isFileOpen = true;
......@@ -31,6 +34,7 @@ public:
virtual const QuickString &getHeader() const =0;
virtual int getNumFields() const = 0;
protected:
int _fileIdx;
string _filename;
BufferedStreamMgr *_bufStreamMgr;
......
......@@ -5,7 +5,7 @@
#include "NewGenomeFile.h"
FileRecordMgr::FileRecordMgr(const QuickString &filename)
:
: _fileIdx(-1),
_filename(filename),
_bufStreamMgr(NULL),
_fileReader(NULL),
......@@ -224,6 +224,7 @@ void FileRecordMgr::allocateFileReader()
default:
break;
}
_fileReader->setFileIdx(_fileIdx);
}
const BamTools::RefVector & FileRecordMgr::getBamReferences() {
......
......@@ -37,6 +37,9 @@ public:
bool open();
void close();
virtual bool eof();
void setFileIdx(int fileIdx) { _fileIdx = fileIdx; }
int getFileIdx() const { return _fileIdx; }
//This is an all-in-one method to give the user a new record that is initialized with
//the next entry in the data file.
......@@ -46,6 +49,7 @@ public:
void deleteRecord(const Record *);
virtual void deleteRecord(RecordKeyList *keyList);
const QuickString &getFileName() const { return _filename;}
bool hasHeader() const { return _fileReader->hasHeader(); }
const QuickString &getHeader() const { return _fileReader->getHeader(); }
......@@ -103,6 +107,7 @@ public:
void setIoBufSize(int val) { _ioBufSize = val; }
protected:
int _fileIdx;
QuickString _filename;
BufferedStreamMgr *_bufStreamMgr;
......
......@@ -45,6 +45,7 @@ bool BamRecord::initFromFile(FileReader *fileReader)
bool BamRecord::initFromFile(BamFileReader *bamFileReader)
{
setFileIdx(bamFileReader->getFileIdx());
_bamAlignment = bamFileReader->getAlignment();
bamFileReader->getChrName(_chrName);
......
......@@ -23,6 +23,7 @@ bool Bed3Interval::initFromFile(FileReader *fileReader)
bool Bed3Interval::initFromFile(SingleLineDelimTextFileReader *fileReader)
{
setFileIdx(fileReader->getFileIdx());
fileReader->getField(0, _chrName);
fileReader->getField(1, _startPosStr);
fileReader->getField(2, _endPosStr);
......
......@@ -23,6 +23,7 @@ void GffRecord::clear()
bool GffRecord::initFromFile(SingleLineDelimTextFileReader *fileReader)
{
setFileIdx(fileReader->getFileIdx());
fileReader->getField(0, _chrName);
fileReader->getField(3, _startPosStr);
_startPos = str2chrPos(_startPosStr);
......
......@@ -3,7 +3,8 @@
#include <cstdio>
Record::Record()
: _chrId(-1),
: _fileIdx(-1),
_chrId(-1),
_startPos(-1),
_endPos(-1),
_strandVal(UNKNOWN),
......@@ -18,6 +19,7 @@ Record::~Record() {
const Record &Record::operator=(const Record &other)
{
_fileIdx = other._fileIdx;
_chrName = other._chrName;
_chrId = other._chrId;
_startPos = other._startPos;
......@@ -29,6 +31,7 @@ const Record &Record::operator=(const Record &other)
}
void Record::clear() {
_fileIdx = -1;
_chrName.clear();
_chrId = -1;
_startPos = -1;
......
......@@ -42,6 +42,8 @@ public:
virtual void setChrName(const string &chr) { _chrName = chr; }
virtual void setChrName(const char *chr) { _chrName = chr; }
virtual int getFileIdx() const { return _fileIdx; }
virtual void setFileIdx(int fileIdx) { _fileIdx = fileIdx; }
virtual int getChromId() const { return _chrId; }
virtual void setChromId(int id) { _chrId = id; }
......@@ -135,6 +137,7 @@ public:
protected:
virtual ~Record(); //by making the destructor protected, only the friend class(es) can actually delete Record objects, or objects derived from Record.
int _fileIdx; //associated file the record came from
QuickString _chrName;
int _chrId;
int _startPos;
......
......@@ -10,6 +10,7 @@
bool VcfRecord::initFromFile(SingleLineDelimTextFileReader *fileReader)
{
setFileIdx(fileReader->getFileIdx());
fileReader->getField(0, _chrName);
_chrId = fileReader->getCurrChromdId();
fileReader->getField(1, _startPosStr);
......
......@@ -18,32 +18,35 @@ NewChromSweep::NewChromSweep(ContextIntersect *context,
bool useMergedIntervals)
: _context(context),
_queryFRM(NULL),
_databaseFRM(NULL),
_useMergedIntervals(false),
_numDBs(_context->getNumDatabaseFiles()),
_useMergedIntervals(false),
_queryRecordsTotalLength(0),
_databaseRecordsTotalLength(0),
_wasInitialized(false),
_currQueryRec(NULL),
_currDatabaseRec(NULL),
_runToQueryEnd(false)
{
}
bool NewChromSweep::init() {
//Create new FileRecordMgrs for the two input files.
//Create new FileRecordMgrs for the input files.
//Open them, and get the first record from each.
//if any of that goes wrong, return false;
//otherwise, return true.
_queryFRM = _context->getFile(_context->getQueryFileIdx());
_databaseFRM = _context->getFile(_context->getDatabaseFileIdx());
nextRecord(false);
// if (_currDatabaseRec == NULL) {
// return false;
// }
_dbFRMs.resize(_numDBs, NULL);
for (int i=0; i < _numDBs; i++) {
_dbFRMs[i] = _context->getDatabaseFile(i);
}
_currDbRecs.resize(_numDBs, NULL);
for (int i=0; i < _numDBs; i++) {
nextRecord(false, i);
}
_caches.resize(_numDBs);
//determine whether to stop when the database end is hit, or keep going until the
//end of the query file is hit as well.
......@@ -59,9 +62,12 @@ void NewChromSweep::closeOut() {
while (!_queryFRM->eof()) {
nextRecord(true);
}
while (!_databaseFRM->eof()) {
nextRecord(false);
}
for (int i=0; i < _numDBs; i++) {
while (!_dbFRMs[i]->eof()) {
nextRecord(false, i);
}
}
}
NewChromSweep::~NewChromSweep(void) {
......@@ -71,136 +77,143 @@ NewChromSweep::~NewChromSweep(void) {
_queryFRM->deleteRecord(_currQueryRec);
_currQueryRec = NULL;
_databaseFRM->deleteRecord(_currDatabaseRec);
_currDatabaseRec = NULL;
for (int i=0; i < _numDBs; i++) {
_dbFRMs[i]->deleteRecord(_currDbRecs[i]);
_currDbRecs[i] = NULL;
}
_queryFRM->close();
_databaseFRM->close();
for (int i=0; i < _numDBs; i++) {
_dbFRMs[i]->close();
}
}
void NewChromSweep::scanCache() {
recListIterType cacheIter = _cache.begin();
while (cacheIter != _cache.end())
void NewChromSweep::scanCache(int dbIdx) {
recListIterType cacheIter = _caches[dbIdx].begin();
while (cacheIter != _caches[dbIdx].end())
{
const Record *cacheRec = cacheIter->value();
if (_currQueryRec->sameChrom(cacheRec) && !(_currQueryRec->after(cacheRec))) {
if (intersects(_currQueryRec, cacheRec)) {
_hits.push_back(cacheRec);
}
cacheIter = _cache.next();
cacheIter = _caches[dbIdx].next();
}
else {
cacheIter = _cache.deleteCurrent();
_databaseFRM->deleteRecord(cacheRec);
cacheIter = _caches[dbIdx].deleteCurrent();
_dbFRMs[dbIdx]->deleteRecord(cacheRec);
}
}
}
void NewChromSweep::clearCache()
void NewChromSweep::clearCache(int dbIdx)
{
//delete all objects pointed to by cache
for (recListIterType iter = _cache.begin(); iter != _cache.end(); iter = _cache.next()) {
_databaseFRM->deleteRecord(iter->value());
recListType &cache = _caches[dbIdx];
for (recListIterType iter = cache.begin(); iter != cache.end(); iter = cache.next()) {
_dbFRMs[dbIdx]->deleteRecord(iter->value());
}
_cache.clear();
cache.clear();
}
bool NewChromSweep::chromChange()
void NewChromSweep::masterScan() {
for (int i=0; i < _numDBs; i++) {
if (dbFinished(i) || chromChange(i)) {
continue;
} else {
// scan the database cache for hits
scanCache(i);
//skip if we hit the end of the DB
// advance the db until we are ahead of the query. update hits and cache as necessary
while (_currDbRecs[i] != NULL &&
_currQueryRec->sameChrom(_currDbRecs[i]) &&
!(_currDbRecs[i]->after(_currQueryRec))) {
if (intersects(_currQueryRec, _currDbRecs[i])) {
_hits.push_back(_currDbRecs[i]);
}
if (_currQueryRec->after(_currDbRecs[i])) {
_dbFRMs[i]->deleteRecord(_currDbRecs[i]);
_currDbRecs[i] = NULL;
} else {
_caches[i].push_back(_currDbRecs[i]);
_currDbRecs[i] = NULL;
}
nextRecord(false, i);
}
}
}
}
bool NewChromSweep::chromChange(int dbIdx)
{
// the files are on the same chrom
if (_currDatabaseRec == NULL || _currQueryRec->sameChrom(_currDatabaseRec)) {
if (_currDbRecs[dbIdx] == NULL || _currQueryRec->sameChrom(_currDbRecs[dbIdx])) {
return false;
}
// the query is ahead of the database. fast-forward the database to catch-up.
if (_currQueryRec->chromAfter(_currDatabaseRec)) {
if (_currQueryRec->chromAfter(_currDbRecs[dbIdx])) {
while (_currDatabaseRec != NULL &&
_currQueryRec->chromAfter(_currDatabaseRec)) {
_databaseFRM->deleteRecord(_currDatabaseRec);
nextRecord(false);
while (_currDbRecs[dbIdx] != NULL &&
_currQueryRec->chromAfter(_currDbRecs[dbIdx])) {
_dbFRMs[dbIdx]->deleteRecord(_currDbRecs[dbIdx]);
nextRecord(false, dbIdx);
}
clearCache();
clearCache(dbIdx);
return false;
}
// the database is ahead of the query.
else {
// 1. scan the cache for remaining hits on the query's current chrom.
if (_currQueryRec->getChrName() == _currChromName)
{
scanCache();
}
// 2. fast-forward until we catch up and report 0 hits until we do.
else if (_currQueryRec->chromBefore(_currDatabaseRec))
{
clearCache();
}
scanCache(dbIdx);
return true;
}
//control can't reach here, but compiler still wants a return statement.
return true;
}
bool NewChromSweep::next(RecordKeyList &next) {
bool NewChromSweep::next(RecordKeyList &retList) {
if (_currQueryRec != NULL) {
_queryFRM->deleteRecord(_currQueryRec);
}
nextRecord(true);
if (_currQueryRec == NULL) { //eof hit!
return false;
}
if (_currDatabaseRec == NULL && _cache.empty() && !_runToQueryEnd) {
if (!nextRecord(true)) return false; // query EOF hit
if (allCurrDBrecsNull() && allCachesEmpty() && !_runToQueryEnd) {
return false;
}
_hits.clear();
_currChromName = _currQueryRec->getChrName();
// have we changed chromosomes?
if (!chromChange()) {
// scan the database cache for hits
scanCache();
//skip if we hit the end of the DB
// advance the db until we are ahead of the query. update hits and cache as necessary
while (_currDatabaseRec != NULL &&
_currQueryRec->sameChrom(_currDatabaseRec) &&
!(_currDatabaseRec->after(_currQueryRec))) {
if (intersects(_currQueryRec, _currDatabaseRec)) {
_hits.push_back(_currDatabaseRec);
}
if (_currQueryRec->after(_currDatabaseRec)) {
_databaseFRM->deleteRecord(_currDatabaseRec);
_currDatabaseRec = NULL;