diff --git a/converter/Gopkg.lock b/converter/Gopkg.lock index 569b8e7..5262423 100644 --- a/converter/Gopkg.lock +++ b/converter/Gopkg.lock @@ -29,15 +29,15 @@ version = "v0.1.1" [[projects]] - digest = "1:8d688914c687eedb46c74d8cfb05e3a351fc3b914ec03f7339178ced760a2d69" + digest = "1:10de4a36895028cd46843f261fa4d62adeebc83060781497e861ea6db0aa92ce" name = "github.com/activecm/rita" packages = [ "config", "parser/parsetypes", ] pruneopts = "" - revision = "ccc5ca0db8ed830ed2901eebb4843e95da1e1ba4" - version = "v1.1.1" + revision = "c279d298ca2128f9d0c9a38b93661272ce115a64" + version = "v2.0.0" [[projects]] branch = "master" @@ -55,6 +55,14 @@ revision = "2ee87856327ba09384cabd113bc6b5d174e9ec0f" version = "v3.5.1" +[[projects]] + digest = "1:cba3e5921fe4f8a4c80374e1f6679a5b0615a0399d5f7d804b5a85db40c0c7eb" + name = "github.com/creasty/defaults" + packages = ["."] + pruneopts = "" + revision = "edf4f6a95a3223dfff4eff2c404103e756fef68d" + version = "v1.3.0" + [[projects]] digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77" name = "github.com/davecgh/go-spew" @@ -262,6 +270,7 @@ "github.com/globalsign/mgo/bson", "github.com/pkg/errors", "github.com/sirupsen/logrus", + "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "github.com/urfave/cli", "gopkg.in/yaml.v2", diff --git a/converter/Gopkg.toml b/converter/Gopkg.toml index bf40b05..ca510df 100644 --- a/converter/Gopkg.toml +++ b/converter/Gopkg.toml @@ -26,7 +26,7 @@ [[constraint]] name = "github.com/activecm/rita" - version = "^1.0.3" + version = "^2.0.0" [[constraint]] name = "github.com/activecm/mgosec" diff --git a/converter/commands/check_config.go b/converter/commands/check_config.go index dcfb4bf..d933c27 100644 --- a/converter/commands/check_config.go +++ b/converter/commands/check_config.go @@ -2,12 +2,12 @@ package commands import ( "fmt" - "github.com/activecm/ipfix-rita/converter/config" "github.com/activecm/ipfix-rita/converter/config/yaml" "github.com/activecm/ipfix-rita/converter/input/logstash/mongodb" "github.com/activecm/ipfix-rita/converter/output/rita" "github.com/urfave/cli" + "time" ) func init() { @@ -53,7 +53,7 @@ func init() { fmt.Printf("Found %d Flow Records Ready For Processing\n", count) coll.Database.Session.Close() - outDB, err := rita.NewOutputDB(conf.GetOutputConfig().GetRITAConfig()) + outDB, err := rita.NewDBManager(conf.GetOutputConfig().GetRITAConfig(), 100, 1*time.Second) if err != nil { return cli.NewExitError(fmt.Sprintf("%+v\n", err), 1) } diff --git a/converter/config/config.go b/converter/config/config.go index f6c95d3..973297d 100644 --- a/converter/config/config.go +++ b/converter/config/config.go @@ -77,6 +77,13 @@ type RITA interface { GetConnectionConfig() MongoDBConnection GetDBRoot() string GetMetaDB() string + GetStrobe() Strobe +} + +//Strobe contains configuration for populating the +//freqConn collection / Strobes analysis in RITA +type Strobe interface { + GetConnectionLimit() int } //Filtering contains information on local subnets and other networks/hosts diff --git a/converter/config/yaml/output.go b/converter/config/yaml/output.go index 6d89d00..0678c24 100644 --- a/converter/config/yaml/output.go +++ b/converter/config/yaml/output.go @@ -16,6 +16,7 @@ type ritaMongoDB struct { MongoDB mongoDBConnection `yaml:"MongoDB-Connection"` DBRoot string `yaml:"DBRoot"` MetaDB string `yaml:"MetaDB"` + Strobe strobe `yaml:"Strobe"` } func (r *ritaMongoDB) GetConnectionConfig() config.MongoDBConnection { @@ -29,3 +30,16 @@ func (r *ritaMongoDB) GetDBRoot() string { func (r *ritaMongoDB) GetMetaDB() string { return r.MetaDB } + +func (r *ritaMongoDB) GetStrobe() config.Strobe { + return &r.Strobe +} + +//strobe implements config.Strobe +type strobe struct { + ConnectionLimit int `yaml:"ConnectionLimit"` +} + +func (s *strobe) GetConnectionLimit() int { + return s.ConnectionLimit +} diff --git a/converter/etc/config.yaml b/converter/etc/config.yaml index cf73b81..4872622 100644 --- a/converter/etc/config.yaml +++ b/converter/etc/config.yaml @@ -16,6 +16,21 @@ Output: # This database holds information about RITA managed databases. MetaDB: MetaDatabase + Strobe: + # This sets the maximum number of connections between any two given hosts that are stored. + # Connections above this limit will be deleted and not used in other analysis modules. This will + # also trigger an entry in the strobe module. A lower value will reduce analysis time and + # hide more potential false positives from other modules. A higher value will increase + # analysis time, increase false positives, but reduce the risk of false negatives. + # Recommended values for this setting are: + # 86400 - One connection every second for 24 hours + # 250000 - (Default) Good middle of the road value + # 700000 - Safe max value that is unlikely to cause errors + # The theoretical limit due to implementation limitations is ~1,048,573 + # but in practice timeouts have occurred at lower values. + ConnectionLimit: 250000 + + Filtering: # These are filters that affect which flows are processed and which # are dropped. diff --git a/converter/integrationtest/config.go b/converter/integrationtest/config.go index 7f38606..43f20c6 100644 --- a/converter/integrationtest/config.go +++ b/converter/integrationtest/config.go @@ -70,12 +70,21 @@ func (t *OutputConfig) GetRITAConfig() config.RITA { return &t.rita } //RitaConfig implements config.RITA type RitaConfig struct { mongoDB MongoDBConfig + strobe StrobeConfig } func (r *RitaConfig) GetConnectionConfig() config.MongoDBConnection { return &r.mongoDB } -func (r *RitaConfig) GetDBRoot() string { return "RITA" } -func (r *RitaConfig) GetMetaDB() string { return "MetaDatabase" } +func (r *RitaConfig) GetDBRoot() string { return "RITA" } +func (r *RitaConfig) GetMetaDB() string { return "MetaDatabase" } +func (r *RitaConfig) GetStrobe() config.Strobe { return &r.strobe } + +type StrobeConfig struct{} + +func (s *StrobeConfig) GetConnectionLimit() int { + //Lowered from the usual 250000 so tests involving this limit don't take forever. + return 100 +} //FilteringConfig implements config.Filtering type FilteringConfig struct{} diff --git a/converter/output/rita/batch/dates/rita_dates.go b/converter/output/rita/batch/dates/rita_dates.go index f071951..40d2d6b 100644 --- a/converter/output/rita/batch/dates/rita_dates.go +++ b/converter/output/rita/batch/dates/rita_dates.go @@ -9,7 +9,6 @@ import ( "github.com/activecm/ipfix-rita/converter/logging" "github.com/activecm/ipfix-rita/converter/output" "github.com/activecm/ipfix-rita/converter/output/rita" - "github.com/activecm/ipfix-rita/converter/output/rita/buffered" "github.com/activecm/ipfix-rita/converter/stitching/session" "github.com/activecm/rita/parser/parsetypes" "github.com/pkg/errors" @@ -23,14 +22,12 @@ import ( //before being sent to MongoDB. The buffers are flushed when //they are full or after a deadline passes for the individual buffer. type batchRITAConnDateWriter struct { - db rita.OutputDB - localNets []net.IPNet - outputCollections map[string]*buffered.AutoFlushCollection - bufferSize int64 - autoFlushTime time.Duration - autoFlushContext context.Context - autoFlushOnFatal func() - log logging.Logger + db rita.DBManager + localNets []net.IPNet + outputDBs map[string]rita.DB + autoFlushContext context.Context + autoFlushOnFatal func() + log logging.Logger } //NewBatchRITAConnDateWriter creates an buffered RITA compatible writer @@ -40,7 +37,7 @@ type batchRITAConnDateWriter struct { //when the buffer is full or after a deadline passes. func NewBatchRITAConnDateWriter(ritaConf config.RITA, localNets []net.IPNet, bufferSize int64, autoFlushTime time.Duration, log logging.Logger) (output.SessionWriter, error) { - db, err := rita.NewOutputDB(ritaConf) + db, err := rita.NewDBManager(ritaConf, bufferSize, autoFlushTime) if err != nil { return nil, errors.Wrap(err, "could not connect to RITA MongoDB") } @@ -48,14 +45,12 @@ func NewBatchRITAConnDateWriter(ritaConf config.RITA, localNets []net.IPNet, autoFlushContext, autoFlushOnFail := context.WithCancel(context.Background()) //return the new writer return &batchRITAConnDateWriter{ - db: db, - localNets: localNets, - outputCollections: make(map[string]*buffered.AutoFlushCollection), - bufferSize: bufferSize, - autoFlushTime: autoFlushTime, - autoFlushContext: autoFlushContext, - autoFlushOnFatal: autoFlushOnFail, - log: log, + db: db, + localNets: localNets, + outputDBs: make(map[string]rita.DB), + autoFlushContext: autoFlushContext, + autoFlushOnFatal: autoFlushOnFail, + log: log, }, nil } @@ -90,14 +85,14 @@ func (r *batchRITAConnDateWriter) Write(sessions <-chan *session.Aggregate) <-ch sess.ToRITAConn(&connRecord, r.isIPLocal) //create/ get the buffered output collection - outColl, err := r.getConnCollectionForSession(sess, errs, r.autoFlushOnFatal) + outDB, err := r.getDBForSession(sess, errs, r.autoFlushOnFatal) if err != nil { errs <- err break WriteLoop } //insert the record - err = outColl.Insert(connRecord) + err = outDB.InsertConnRecord(&connRecord) if err != nil { errs <- err break WriteLoop @@ -109,11 +104,13 @@ func (r *batchRITAConnDateWriter) Write(sessions <-chan *session.Aggregate) <-ch } func (r *batchRITAConnDateWriter) closeDBSessions(errs chan<- error) { - for i := range r.outputCollections { - r.outputCollections[i].Close() + for i := range r.outputDBs { + err := r.outputDBs[i].Close() + if err != nil { + errs <- err + } - err := r.db.MarkImportFinishedInMetaDB(r.outputCollections[i].Database()) - //stops outputCollections from sending on errs + err = r.outputDBs[i].MarkFinished() if err != nil { errs <- err } @@ -132,39 +129,28 @@ func (r *batchRITAConnDateWriter) isIPLocal(ipAddrStr string) bool { return false } -func (r *batchRITAConnDateWriter) getConnCollectionForSession(sess *session.Aggregate, - autoFlushAsyncErrChan chan<- error, autoFlushOnFatal func()) (*buffered.AutoFlushCollection, error) { +func (r *batchRITAConnDateWriter) getDBForSession(sess *session.Aggregate, + autoFlushAsyncErrChan chan<- error, autoFlushOnFatal func()) (rita.DB, error) { //get the latest flowEnd time endTimeMilliseconds := sess.FlowEndMilliseconds() //time.Unix(seconds, nanoseconds) - //1000 milliseconds per second, 1000 nanosecodns to a microsecond. 1000 microseconds to a millisecond + //1000 milliseconds per second, 1000 nanoseconds to a microsecond. 1000 microseconds to a millisecond endTime := time.Unix(endTimeMilliseconds/1000, (endTimeMilliseconds%1000)*1000*1000) endTimeStr := endTime.Format("2006-01-02") //cache the database connection - outBufferedColl, ok := r.outputCollections[endTimeStr] + outDB, ok := r.outputDBs[endTimeStr] if !ok { //connect to the db var err error - outColl, err := r.db.NewRITAOutputConnection(endTimeStr) + outDB, err := r.db.NewRitaDB(endTimeStr, autoFlushAsyncErrChan, autoFlushOnFatal) if err != nil { - return nil, errors.Wrapf(err, "could not connect to output database for suffix: %s", endTimeStr) + return outDB, errors.Wrapf(err, "could not create output database for suffix: %s", endTimeStr) } - //create the meta db record - err = r.db.EnsureMetaDBRecordExists(outColl.Database.Name) - if err != nil { - outColl.Database.Session.Close() - return nil, err - } - - //create the output buffer - outBufferedColl = buffered.NewAutoFlushCollection(outColl, r.bufferSize, r.autoFlushTime) - outBufferedColl.StartAutoFlush(autoFlushAsyncErrChan, autoFlushOnFatal) - //cache the result - r.outputCollections[endTimeStr] = outBufferedColl + r.outputDBs[endTimeStr] = outDB } - return outBufferedColl, nil + return outDB, nil } diff --git a/converter/output/rita/buffered/collection.go b/converter/output/rita/buffered/collection.go index 788e5c4..f5cfd23 100644 --- a/converter/output/rita/buffered/collection.go +++ b/converter/output/rita/buffered/collection.go @@ -3,8 +3,8 @@ package buffered import ( "sync" - "github.com/pkg/errors" mgo "github.com/globalsign/mgo" + "github.com/pkg/errors" ) //Collection wraps an *mgo.Collection in order @@ -16,7 +16,10 @@ type Collection struct { } //InitializeCollection wraps a *mgo.Collection with a buffer of a given size -//for performing buffered insertions. +//for performing buffered insertions. Note the Collection.Close() method +//closes the socket used by the collection handle. You may want to +//copy the initial connection before passing the handle to this +//constructor. func InitializeCollection(coll *Collection, mgoCollection *mgo.Collection, bufferSize int64) { coll.mgoCollection = mgoCollection coll.buffer = make([]interface{}, 0, bufferSize) diff --git a/converter/output/rita/constants/constants.go b/converter/output/rita/constants/constants.go new file mode 100644 index 0000000..6e5f8f7 --- /dev/null +++ b/converter/output/rita/constants/constants.go @@ -0,0 +1,16 @@ +package constants + +//MetaDBDatabasesCollection is the name of the RITA collection +//in the RITA MetaDB that keeps track of RITA managed databases +const MetaDBDatabasesCollection = "databases" + +//StrobesCollection contains the name for the RITA freqConn MongoDB collection +const StrobesCollection = "freqConn" + +//ConnCollection contains the name for the RITA conn MongoDB collection +const ConnCollection = "conn" + +// Version specifies which RITA DB schema the resulting data matches +var Version = "v2.0.0+ActiveCM-IPFIX" + +// TODO: Use version in RITA as dep diff --git a/converter/output/rita/db.go b/converter/output/rita/db.go new file mode 100644 index 0000000..973690c --- /dev/null +++ b/converter/output/rita/db.go @@ -0,0 +1,147 @@ +package rita + +import ( + "github.com/activecm/ipfix-rita/converter/output/rita/buffered" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" + "github.com/activecm/ipfix-rita/converter/output/rita/freqconn" + "github.com/activecm/rita/parser/parsetypes" + "github.com/globalsign/mgo" + "github.com/pkg/errors" + "time" +) + +//DB represents a RITA database and provides routines +//for constructing a new RITA database and inserting +//new data into that database +type DB struct { + manager DBManager + outputDB *mgo.Database + connColl *buffered.AutoFlushCollection + connCounter freqconn.ConnCounter + strobesNotifier freqconn.StrobesNotifier +} + +func newDB(dbManager DBManager, outputDB *mgo.Database, + strobeThreshold int, bufferSize int64, flushDeadline time.Duration, + asyncErrorChan chan<- error, onFatalError func()) (DB, error) { + + strobesSess := outputDB.Session.Copy() + connSess := outputDB.Session.Copy() + + connColl := buffered.NewAutoFlushCollection( + outputDB.C(constants.ConnCollection).With(connSess), + bufferSize, flushDeadline, + ) + + //The strobes notifier needs access to the connColl so it can flush + //the connColl buffer before it removes entries from the conn collection + strobesNotifier := freqconn.NewStrobesNotifier(outputDB.With(strobesSess), connColl) + + //Get the existing strobes data so our counts start aligned + existingStrobeData, err := strobesNotifier.LoadFreqConnCollection() + if err != nil { + return DB{}, err + } + + connCounter := freqconn.NewConnCounterFromMap(existingStrobeData, strobeThreshold, strobesNotifier) + + db := DB{ + manager: dbManager, + outputDB: outputDB, + connColl: connColl, + connCounter: connCounter, + strobesNotifier: strobesNotifier, + } + + err = db.ensureConnIndexExists() + if err != nil { + strobesSess.Close() + connSess.Close() + return db, err + } + + err = db.ensureFreqConnIndexExists() + if err != nil { + strobesSess.Close() + connSess.Close() + return db, err + } + + started := connColl.StartAutoFlush(asyncErrorChan, onFatalError) + if !started { + err = errors.Errorf("failed to start auto flusher for collection %s.%s", outputDB.Name, constants.ConnCollection) + strobesSess.Close() + connSess.Close() + return db, err + } + + err = dbManager.ensureMetaDBRecordExists(outputDB.Name) + if err != nil { + strobesSess.Close() + connSess.Close() + return db, err + } + + return db, nil +} + +func (d DB) ensureConnIndexExists() error { + tmpConn := parsetypes.Conn{} + for _, index := range tmpConn.Indices() { + err := d.outputDB.C(constants.ConnCollection).EnsureIndex(mgo.Index{ + Key: []string{index}, + }) + + if err != nil { + return errors.Wrapf(err, "could not create RITA conn index %s", index) + } + } + return nil +} + +func (d DB) ensureFreqConnIndexExists() error { + tmpFreq := parsetypes.Freq{} + for _, index := range tmpFreq.Indices() { + err := d.outputDB.C(constants.StrobesCollection).EnsureIndex(mgo.Index{ + Key: []string{index}, + }) + + if err != nil { + return errors.Wrapf(err, "could not create RITA freqConn index %s", index) + } + } + return nil +} + +//InsertConnRecord writes a connection record to the RITA database. +//Each connection pair is counted, and if the count exceeds a threshold, +//the connection info is sent to freqConn, otherwise it is sent to conn +func (d DB) InsertConnRecord(connRecord *parsetypes.Conn) error { + thresholdMet, err := d.connCounter.Increment(freqconn.UConnPair{ + Src: connRecord.Source, + Dst: connRecord.Destination, + }) + if err != nil { + return err + } + if !thresholdMet { + return d.connColl.Insert(connRecord) + } + return nil +} + +//MarkFinished ensures that the database is ready for analysis +//by RITA. Note: MarkFinished may be called after Close. This is +//by design so the MetaDatabase is only updated after the last +//of the data has been flushed to MongoDB. +func (d DB) MarkFinished() error { + return d.manager.markImportFinishedInMetaDB(d.outputDB.Name) +} + +//Close closes the underlying database connections wrapped by the DB +func (d DB) Close() error { + d.strobesNotifier.Close() + //An error may arise when the collection is flushed in d.connColl.Close() + err := d.connColl.Close() + return err +} diff --git a/converter/output/rita/database.go b/converter/output/rita/dbmanager.go similarity index 51% rename from converter/output/rita/database.go rename to converter/output/rita/dbmanager.go index d751d69..988b189 100644 --- a/converter/output/rita/database.go +++ b/converter/output/rita/dbmanager.go @@ -5,23 +5,12 @@ import ( "github.com/activecm/ipfix-rita/converter/config" "github.com/activecm/ipfix-rita/converter/database" - "github.com/activecm/rita/parser/parsetypes" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" mgo "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/pkg/errors" ) -//MetaDBDatabasesCollection is the name of the RITA collection -//in the RITA MetaDB that keeps track of RITA managed databases -const MetaDBDatabasesCollection = "databases" - -//RitaConnInputCollection is the name of the RITA collection -//which houses input connection data -const RitaConnInputCollection = "conn" - -var Version = "v2.0.0+ActiveCM-IPFIX" - -//TODO: Use version in RITA as dep // DBMetaInfo defines some information about the database type DBMetaInfo struct { ID bson.ObjectId `bson:"_id,omitempty"` // Ident @@ -32,18 +21,27 @@ type DBMetaInfo struct { AnalyzeVersion string `bson:"analyze_version"` // Rita version at analyze } -//OutputDB wraps a *mgo.Session connected to MongoDB +//DBManager wraps a *mgo.Session connected to MongoDB //and provides facility for interacting with RITA compatible databases -type OutputDB struct { - ssn *mgo.Session - metaDBName string - dbRoot string +type DBManager struct { + ssn *mgo.Session + metaDBName string + dbRoot string + strobeThreshold int + bufferSize int64 + flushDeadline time.Duration } -//NewOutputDB instantiates a new RITAOutputDB with the +//NewDBManager instantiates a new RITAOutputDB with the //details specified in the RITA configuration -func NewOutputDB(ritaConf config.RITA) (OutputDB, error) { - db := OutputDB{} +func NewDBManager(ritaConf config.RITA, bufferSize int64, flushDeadline time.Duration) (DBManager, error) { + + db := DBManager{ + strobeThreshold: ritaConf.GetStrobe().GetConnectionLimit(), + bufferSize: bufferSize, + flushDeadline: flushDeadline, + } + var err error db.ssn, err = database.Dial(ritaConf.GetConnectionConfig()) if err != nil { @@ -56,7 +54,7 @@ func NewOutputDB(ritaConf config.RITA) (OutputDB, error) { db.dbRoot = ritaConf.GetDBRoot() db.metaDBName = ritaConf.GetMetaDB() - db.ssn.DB(db.metaDBName).C(MetaDBDatabasesCollection).EnsureIndex(mgo.Index{ + err = db.ssn.DB(db.metaDBName).C(constants.MetaDBDatabasesCollection).EnsureIndex(mgo.Index{ Key: []string{ "name", }, @@ -73,56 +71,42 @@ func NewOutputDB(ritaConf config.RITA) (OutputDB, error) { return db, nil } -//NewMetaDBDatabasesConnection returns a new socket connected to the -//MetaDB databases collection -func (o OutputDB) NewMetaDBDatabasesConnection() *mgo.Collection { - return o.ssn.DB(o.metaDBName).C(MetaDBDatabasesCollection).With(o.ssn.Copy()) -} - -//NewRITAOutputConnection returns a new socket connected to the -//RITA output collection with a given DB suffix -func (o OutputDB) NewRITAOutputConnection(dbNameSuffix string) (*mgo.Collection, error) { - ssn := o.ssn.Copy() - dbName := o.dbRoot +//NewRitaDB creates a new RITA Database by creating the appropriate +//MetaDB records, ensuring the correct indexes are in place, and returning +//a new rita.DB object. As data is written to the rita.DB object, +//data is continually flushed out to the database on another thread. +//If any errors occur on the flushing thread, they are reported on +//asyncErrorChan. If a fatal error occurs, onFatalError is called. +func (d DBManager) NewRitaDB(dbNameSuffix string, asyncErrorChan chan<- error, onFatalError func()) (DB, error) { + dbName := d.dbRoot if dbNameSuffix != "" { - dbName = o.dbRoot + "-" + dbNameSuffix - } - - //create the conn collection handle - connColl := ssn.DB(dbName).C(RitaConnInputCollection) - - //ensure RITA's needed indexes exist - tmpConn := parsetypes.Conn{} - for _, index := range tmpConn.Indices() { - err := connColl.EnsureIndex(mgo.Index{ - Key: []string{index}, - }) - - if err != nil { - ssn.Close() - return nil, errors.Wrapf(err, "could not create RITA conn index %s", index) - } + dbName = d.dbRoot + "-" + dbNameSuffix } - return connColl, nil + //note newDB will spawn off new sockets for connecting to MongoDB + return newDB( + d, d.ssn.DB(dbName), + d.strobeThreshold, d.bufferSize, d.flushDeadline, + asyncErrorChan, onFatalError, + ) } -//EnsureMetaDBRecordExists ensures that a database record exists in the +//ensureMetaDBRecordExists ensures that a database record exists in the //MetaDatabase for a given database name. This allows RITA to manage //the database. -func (o OutputDB) EnsureMetaDBRecordExists(dbName string) error { - numRecords, err := o.ssn.DB(o.metaDBName).C(MetaDBDatabasesCollection).Find(bson.M{"name": dbName}).Count() +func (d DBManager) ensureMetaDBRecordExists(dbName string) error { + numRecords, err := d.ssn.DB(d.metaDBName).C(constants.MetaDBDatabasesCollection).Find(bson.M{"name": dbName}).Count() if err != nil { return errors.Wrapf(err, "could not count MetaDB records with name: %s", dbName) } if numRecords != 0 { return nil } - err = o.ssn.DB(o.metaDBName).C(MetaDBDatabasesCollection).Insert(DBMetaInfo{ + err = d.ssn.DB(d.metaDBName).C(constants.MetaDBDatabasesCollection).Insert(DBMetaInfo{ Name: dbName, ImportFinished: false, Analyzed: false, - ImportVersion: Version, + ImportVersion: constants.Version, AnalyzeVersion: "", }) if err != nil { @@ -131,12 +115,12 @@ func (o OutputDB) EnsureMetaDBRecordExists(dbName string) error { return nil } -//MarkImportFinishedInMetaDB sets the import_finished flag on the +//markImportFinishedInMetaDB sets the import_finished flag on the //RITA MetaDatabase database record. This lets RITA know that no //more data will be placed in the database and that the database //is ready for analysis. -func (o OutputDB) MarkImportFinishedInMetaDB(dbName string) error { - err := o.ssn.DB(o.metaDBName).C(MetaDBDatabasesCollection).Update( +func (d DBManager) markImportFinishedInMetaDB(dbName string) error { + err := d.ssn.DB(d.metaDBName).C(constants.MetaDBDatabasesCollection).Update( bson.M{"name": dbName}, bson.M{ "$set": bson.M{ @@ -146,19 +130,19 @@ func (o OutputDB) MarkImportFinishedInMetaDB(dbName string) error { ) if err != nil { - return errors.Wrapf(err, "could not mark database %s imported in database index %s.%s", dbName, o.metaDBName, MetaDBDatabasesCollection) + return errors.Wrapf(err, "could not mark database %s imported in database index %s.%s", dbName, d.metaDBName, constants.MetaDBDatabasesCollection) } return nil } //Ping ensures the database connection is valid -func (o OutputDB) Ping() error { - err := o.ssn.Ping() +func (d DBManager) Ping() error { + err := d.ssn.Ping() if err != nil { return errors.Wrap(err, "could not contact the database") } //see if theres any permissions problems - _, err = o.ssn.DatabaseNames() + _, err = d.ssn.DatabaseNames() if err != nil { return errors.Wrap(err, "could not list the databases in the database") } @@ -166,6 +150,6 @@ func (o OutputDB) Ping() error { } //Close closing the underlying connection to MongoDB -func (o OutputDB) Close() { - o.ssn.Close() +func (d DBManager) Close() { + d.ssn.Close() } diff --git a/converter/output/rita/freqconn/counter.go b/converter/output/rita/freqconn/counter.go new file mode 100644 index 0000000..f4f423c --- /dev/null +++ b/converter/output/rita/freqconn/counter.go @@ -0,0 +1,82 @@ +package freqconn + +// UConnPair records a unique connection pair. i.e. +// two ip addresses. Used to track how many times +// two hosts talk to each other +type UConnPair struct { + Src string `bson:"src"` + Dst string `bson:"dst"` +} + +// FreqConn records how many times a unique connection pair +// connected +type FreqConn struct { + UConnPair `bson:",inline"` + ConnectionCount int `bson:"connection_count"` +} + +// ConnCounter tracks how many UConnPairs with +// matching source and destination addresses have been processed. +// When the count for a given UConnPair meets the threshold, +// the ThresholdMet method on the given ConnCountNotifier will be executed +// with the UConnPair and the new count. If the count then exceeds the threshold, +// the ThresholdExceeded method will then be ran in a similar fashion. +type ConnCounter struct { + connectionCounts map[UConnPair]int + threshold int + notifier ConnCountNotifier +} + +// ConnCountNotifier specifies an interface for updating an external component +// with the new count for a given connection pair. ThresholdMet will be called +// when the count hits a specified threshold, and ThresholdExceeded will be +// called when the count exceeds a specified threshold. +type ConnCountNotifier interface { + ThresholdMet(UConnPair, int) error + ThresholdExceeded(UConnPair, int) error +} + +// NewConnCounter creates a new ConnCounter. Each unique connection +// starts at 0. +func NewConnCounter(threshold int, notifier ConnCountNotifier) ConnCounter { + return ConnCounter{ + connectionCounts: make(map[UConnPair]int), + threshold: threshold, + notifier: notifier, + } +} + +// NewConnCounterFromMap creates a new ConnCounter. Each unique +// connection starts with the counts supplied in the data map. +func NewConnCounterFromMap(data map[UConnPair]int, threshold int, notifier ConnCountNotifier) ConnCounter { + c := ConnCounter{ + connectionCounts: data, + threshold: threshold, + notifier: notifier, + } + return c +} + +// Increment increments the count corresponding to the +// UConnPair passed in. If the ConnCounter threshold is +// met, thresholdMetFunc is ran. Alternatively, if the +// threshold is exceeded, thresholdExceededFunc is ran. +// Returns true if either thresholdMet or thresholdExceeded +// is called. May return an error from either function. +// If an error is returned, the count is not updated. +func (f ConnCounter) Increment(connectionPair UConnPair) (bool, error) { + newCount := f.connectionCounts[connectionPair] + 1 + var err error + funcRan := false + if newCount == f.threshold { + err = f.notifier.ThresholdMet(connectionPair, newCount) + funcRan = true + } else if newCount > f.threshold { + err = f.notifier.ThresholdExceeded(connectionPair, newCount) + funcRan = true + } + if err == nil { + f.connectionCounts[connectionPair] = newCount + } + return funcRan, err +} diff --git a/converter/output/rita/freqconn/counter_test.go b/converter/output/rita/freqconn/counter_test.go new file mode 100644 index 0000000..d25acad --- /dev/null +++ b/converter/output/rita/freqconn/counter_test.go @@ -0,0 +1,135 @@ +package freqconn_test + +import ( + "errors" + "testing" + + "github.com/activecm/ipfix-rita/converter/output/rita/freqconn" + "github.com/stretchr/testify/require" +) + +//closureConnCountNotifier provides a way to anonymously define +//an implementation for ConnCountNotifier +type closureConnCountNotifier struct { + thresholdMetFunc func(freqconn.UConnPair, int) error + thresholdExceededFunc func(freqconn.UConnPair, int) error +} + +func (a closureConnCountNotifier) ThresholdMet(conn freqconn.UConnPair, count int) error { + return a.thresholdMetFunc(conn, count) +} + +func (a closureConnCountNotifier) ThresholdExceeded(conn freqconn.UConnPair, count int) error { + return a.thresholdExceededFunc(conn, count) +} + +//TestThresholdMet ensures thresholdMet is called +//when the connection counter hits the threshold but +//thresholdExceeded is not +func TestThresholdMet(t *testing.T) { + shouldPass := false + thresholdMet := func(conn freqconn.UConnPair, count int) error { + shouldPass = true + return nil + } + thresholdExceeded := func(conn freqconn.UConnPair, count int) error { + t.Fatalf("thresholdExceeded called when it should not have been. count %d, threshold %d", count, testThreshold) + return nil + } + c := freqconn.NewConnCounter(testThreshold, closureConnCountNotifier{thresholdMet, thresholdExceeded}) + + testConnection := freqconn.UConnPair{ + Src: "1.1.1.1", + Dst: "2.2.2.2", + } + + for i := 0; i < testThreshold-1; i++ { + funcRan, err := c.Increment(testConnection) + require.False(t, funcRan, "Increment said a threshold function ran when it should not have") + require.Nil(t, err, "Increment returned an error when it shouldn't have") + } + + funcRan, err := c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Nil(t, err, "Increment returned an error when it shouldn't have") + + require.True(t, shouldPass, "thresholdMet was not called.") +} + +//TestThresholdExceeded ensures thresholdExceeded is called +//when the connection counter exceeds the threshold but +//thresholdExceeded is not +func TestThresholdExceeded(t *testing.T) { + shouldPass := false + thresholdMetCalledOnce := false + thresholdMet := func(conn freqconn.UConnPair, count int) error { + if thresholdMetCalledOnce { + t.Fatalf("thresholdMet called when it should not have been. count %d, threshold %d", count, testThreshold) + } else { + thresholdMetCalledOnce = true + } + return nil + } + thresholdExceeded := func(conn freqconn.UConnPair, count int) error { + shouldPass = true + return nil + } + c := freqconn.NewConnCounter(testThreshold, closureConnCountNotifier{thresholdMet, thresholdExceeded}) + + testConnection := freqconn.UConnPair{ + Src: "1.1.1.1", + Dst: "2.2.2.2", + } + + for i := 0; i < testThreshold-1; i++ { + funcRan, err := c.Increment(testConnection) + require.False(t, funcRan, "Increment said a threshold function ran when it should not have") + require.Nil(t, err, "Increment returned an error when it shouldn't have") + } + + funcRan, err := c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Nil(t, err, "Increment returned an error when it shouldn't have") + + funcRan, err = c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Nil(t, err, "Increment returned an error when it shouldn't have") + + require.True(t, shouldPass, "thresholdExceeded was not called.") +} + +//TestErrorsReturned ensures the errors returned from thresholdExceeded +//and thresholdMet are returned via Increment. Additionally the test +//asserts that the counter should not be incremented if there is an error. +func TestErrorsReturned(t *testing.T) { + thresholdMetErr := errors.New("thresholdMet error") + thresholdExceededErr := errors.New("thresholdExceeded error") + + thresholdMet := func(conn freqconn.UConnPair, count int) error { + return thresholdMetErr + } + thresholdExceeded := func(conn freqconn.UConnPair, count int) error { + return thresholdExceededErr + } + + c := freqconn.NewConnCounter(1, closureConnCountNotifier{thresholdMet, thresholdExceeded}) + + testConnection := freqconn.UConnPair{ + Src: "1.1.1.1", + Dst: "2.2.2.2", + } + + funcRan, err := c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Equal(t, thresholdMetErr, err, "error from thresholdMet not returned") + + funcRan, err = c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Equal(t, thresholdMetErr, err, "error from thresholdMet not returned") + + c = freqconn.NewConnCounter(1, closureConnCountNotifier{func(freqconn.UConnPair, int) error { return nil }, thresholdExceeded}) + c.Increment(testConnection) + funcRan, err = c.Increment(testConnection) + require.True(t, funcRan, "Increment said threshold function did not run when it should have") + require.Equal(t, thresholdExceededErr, err, "error from thresholdMet not returned") +} diff --git a/converter/output/rita/freqconn/main_test.go b/converter/output/rita/freqconn/main_test.go new file mode 100644 index 0000000..b663232 --- /dev/null +++ b/converter/output/rita/freqconn/main_test.go @@ -0,0 +1,106 @@ +package freqconn_test + +import ( + "github.com/globalsign/mgo" + "os" + "testing" + + "github.com/activecm/dbtest" + "github.com/activecm/ipfix-rita/converter/integrationtest" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" +) + +const testThreshold = 10 + +const testDBName = "test" + +const mongoContainerFixtureKey = "freqconn-test-db-container" + +var fixtureManager *integrationtest.FixtureManager + +var freqConnInitFixture = integrationtest.TestFixture{ + Key: "freqconn-init", + Requires: []string{mongoContainerFixtureKey}, + LongRunning: true, + Before: func(t *testing.T, fixtures integrationtest.FixtureData) (interface{}, bool) { + //The tests maintain their own sessions, we just make sure the database is empty + //and the collections have their proper indices + mongoContainer := fixtures.Get(mongoContainerFixtureKey).(dbtest.MongoDBContainer) + ssn, err := mongoContainer.NewSession() + if err != nil { + t.Error(err) + return nil, false + } + + connCollection := ssn.DB(testDBName).C(constants.ConnCollection) + strobesCollection := ssn.DB(testDBName).C(constants.StrobesCollection) + + connCount, err := connCollection.Find(nil).Count() + if err != nil { + t.Error(err) + return nil, false + } + + if connCount != 0 { + err = connCollection.DropCollection() + if err != nil { + t.Error(err) + return nil, false + } + } + + strobesCount, err := strobesCollection.Find(nil).Count() + if err != nil { + t.Error(err) + return nil, false + } + + if strobesCount != 0 { + err = strobesCollection.DropCollection() + if err != nil { + t.Error(err) + return nil, false + } + } + + connIndices := []string{"$hashed:id_orig_h", "$hashed:id_resp_h", "-duration", "ts", "uid"} + strobesIndices := []string{"$hashed:src", "$hashed:dst", "-connection_count"} + + for _, index := range connIndices { + err := connCollection.EnsureIndex(mgo.Index{ + Key: []string{index}, + }) + if err != nil { + t.Error(err) + return nil, false + } + } + + for _, index := range strobesIndices { + err := strobesCollection.EnsureIndex(mgo.Index{ + Key: []string{index}, + }) + if err != nil { + t.Error(err) + return nil, false + } + } + ssn.Close() + return nil, false + }, +} + +//TestMain is responsible for setting up and tearing down any +//resources needed by all tests +func TestMain(m *testing.M) { + fixtureManager = integrationtest.NewFixtureManager() + fixtureManager.RegisterFixture(integrationtest.DockerLoaderFixture) + fixtureManager.RegisterFixture( + integrationtest.NewMongoDBContainerFixture(mongoContainerFixtureKey), + ) + fixtureManager.RegisterFixture(freqConnInitFixture) + fixtureManager.BeginTestPackage() + returnCode := m.Run() + fixtureManager.EndTestPackage() + os.Exit(returnCode) +} diff --git a/converter/output/rita/freqconn/strobes.go b/converter/output/rita/freqconn/strobes.go new file mode 100644 index 0000000..f9f5b8d --- /dev/null +++ b/converter/output/rita/freqconn/strobes.go @@ -0,0 +1,104 @@ +package freqconn + +import ( + "github.com/activecm/ipfix-rita/converter/output/rita/buffered" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" +) + +//StrobesNotifier implements ConnCountNotifier and serves to keep +//the RITA conn and freqConn collections in line with the internal +//connection count map. This effectively implements RITA's "strobes" analysis. +type StrobesNotifier struct { + db *mgo.Database + connAutoFlushColl *buffered.AutoFlushCollection +} + +//NewStrobesNotifier creates a new StrobesNotifier from a MongoDB +//database handle. If an AutoFlushCollection is currently bound +//to the RITA conn collection, pass in a reference and the Notifier +//will ensure the collection buffer is flushed before altering the +//conn collection. If no such AutoFlushCollection exists, pass in nil. +// Note the StrobesNotifier.Close() method +//closes the socket used by the db handle. You may want to +//copy the initial connection before passing the handle to this +//constructor. +func NewStrobesNotifier(db *mgo.Database, connAutoFlushColl *buffered.AutoFlushCollection) StrobesNotifier { + return StrobesNotifier{ + db: db, + connAutoFlushColl: connAutoFlushColl, + } +} + +//LoadFreqConnCollection reads the data in the StrobesCollection +//of a RITA database into a map which counts how many times +//a connection pair was seen +func (s StrobesNotifier) LoadFreqConnCollection() (map[UConnPair]int, error) { + strobeIter := s.db.C(constants.StrobesCollection).Find(nil).Iter() + dataMap := make(map[UConnPair]int) + var entry FreqConn + for strobeIter.Next(&entry) { + dataMap[entry.UConnPair] = entry.ConnectionCount + } + err := strobeIter.Err() + return dataMap, err +} + +//ThresholdMet deletes any matching entries in the RITA ConnCollection +//and creates a new record in the freqConn collection +func (s StrobesNotifier) ThresholdMet(connPair UConnPair, count int) error { + + //We have to ensure that the auto flush collection pushes any buffered + //data to the conn collection before we issue the remove command. + //Otherwise data may be pushed into the conn collection after we call + //remove. + if s.connAutoFlushColl != nil { + err := s.connAutoFlushColl.Flush() + if err != nil { + return err + } + } + + _, err := s.db.C(constants.ConnCollection).RemoveAll(bson.M{ + "$and": []bson.M{ + bson.M{"id_orig_h": connPair.Src}, + bson.M{"id_resp_h": connPair.Dst}, + }, + }) + + if err != nil { + return err + } + err = s.db.C(constants.StrobesCollection).Insert(FreqConn{ + UConnPair: connPair, + ConnectionCount: count, + }) + return err +} + +//ThresholdExceeded updates the connection_count field in a freqConn collection +//entry matching a given UConnPair +func (s StrobesNotifier) ThresholdExceeded(connPair UConnPair, count int) error { + //Note we have to track the count in counter.go anyways, so + //we could just update with count instead of calling inc + //but inc gets the point across a bit better. + + err := s.db.C(constants.StrobesCollection).Update( + bson.M{ + "src": connPair.Src, + "dst": connPair.Dst, + }, + bson.M{ + "$inc": bson.M{ + "connection_count": 1, + }, + }, + ) + return err +} + +//Close closes the socket wrapped by the Database +func (s StrobesNotifier) Close() { + s.db.Session.Close() +} diff --git a/converter/output/rita/freqconn/strobes_test.go b/converter/output/rita/freqconn/strobes_test.go new file mode 100644 index 0000000..472df32 --- /dev/null +++ b/converter/output/rita/freqconn/strobes_test.go @@ -0,0 +1,215 @@ +package freqconn_test + +import ( + "fmt" + "github.com/activecm/dbtest" + "github.com/activecm/ipfix-rita/converter/output/rita/buffered" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" + "github.com/activecm/ipfix-rita/converter/output/rita/freqconn" + "github.com/activecm/rita/parser/parsetypes" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +//TestLoadFreqConnCollection loads the freqConn collection up with data and pulls it down +//with the LoadFreqConnCollection function. +func TestLoadFreqConnCollection(t *testing.T) { + fixtures := fixtureManager.BeginTest(t) + defer fixtureManager.EndTest(t) + + mongoDBContainer := fixtures.GetWithSkip(t, mongoContainerFixtureKey).(dbtest.MongoDBContainer) + + ssn, err := mongoDBContainer.NewSession() + require.Nil(t, err, "Could not connect to MongoDB") + defer ssn.Close() + + testDB := ssn.DB(testDBName) + + // Populate the collection + for i := 0; i < 100; i++ { + err = testDB.C(constants.StrobesCollection).Insert(&freqconn.FreqConn{ + UConnPair: freqconn.UConnPair{ + Src: fmt.Sprintf("%d.%d.%d.%d", i, i, i, i), + Dst: fmt.Sprintf("%d.%d.%d.%d", i+1, i+1, i+1, i+1), + }, + ConnectionCount: i, + }) + require.Nil(t, err, "Could not insert test data") + } + + freqConnNotifier := freqconn.NewStrobesNotifier(testDB, nil) + + // Try to read the data + data, err := freqConnNotifier.LoadFreqConnCollection() + require.Nil(t, err, "Could not fetch freqconn entries") + + require.Len(t, data, 100, "Number of retrieved records does not match inserted data") + + for connPair, count := range data { + srcStr := fmt.Sprintf("%d.%d.%d.%d", count, count, count, count) + destStr := fmt.Sprintf("%d.%d.%d.%d", count+1, count+1, count+1, count+1) + require.Equal(t, srcStr, connPair.Src, "Data retrieved does not match the original data") + require.Equal(t, destStr, connPair.Dst, "Data retrieved does not match the original data") + } +} + +//TestStrobesThresholdMet ensures ThresholdMet clears out any matching records in the conn +//collection and inserts a new record into the freqConn collection. +func TestStrobesThresholdMet(t *testing.T) { + fixtures := fixtureManager.BeginTest(t) + defer fixtureManager.EndTest(t) + + mongoDBContainer := fixtures.GetWithSkip(t, mongoContainerFixtureKey).(dbtest.MongoDBContainer) + + ssn, err := mongoDBContainer.NewSession() + require.Nil(t, err, "Could not connect to MongoDB") + defer ssn.Close() + + testDB := ssn.DB(testDBName) + + srcIP := "1.1.1.1" + dstIP := "2.2.2.2" + + s := parsetypes.Conn{ + Source: srcIP, + Destination: dstIP, + } + + for i := 0; i < testThreshold-1; i++ { + err = testDB.C(constants.ConnCollection).Insert(&s) + require.Nil(t, err, "Could not insert test data") + } + + freqConnNotifier := freqconn.NewStrobesNotifier(testDB, nil) + + err = freqConnNotifier.ThresholdMet(freqconn.UConnPair{ + Src: srcIP, + Dst: dstIP, + }, testThreshold) + + require.Nil(t, err, "Could not delete existing conn records or create a new freqConn record") + + connCount, err := testDB.C(constants.ConnCollection).Count() + require.Nil(t, err, "Could not count how many records remain in conn collection") + require.Zero(t, connCount, "Matching records were not removed from the conn collection after ThresholdMet was ran") + + freqCount, err := testDB.C(constants.StrobesCollection).Count() + require.Nil(t, err, "Could not count how many records exist in freqConn collection") + require.Equal(t, 1, freqCount, "ThresholdMet did not create a single record in freqConn") + + var freqResult freqconn.FreqConn + err = testDB.C(constants.StrobesCollection).Find(nil).One(&freqResult) + require.Nil(t, err, "Could not check freqConn for new records after ThresholdMet was ran") + + require.Equal(t, srcIP, freqResult.Src, "Source IP in freqConn does not match the original address") + require.Equal(t, dstIP, freqResult.Dst, "Destination IP in freqConn does not match the original address") + require.Equal(t, testThreshold, freqResult.ConnectionCount, "Connection count in freqConn does not match the count passed to ThresholdMet") +} + +//TestStrobesThresholdExceeded ensures ThresholdExceeded increments the connection_count +//for a given UConnPair in the freqConn collection. +func TestStrobesThresholdExceeded(t *testing.T) { + fixtures := fixtureManager.BeginTest(t) + defer fixtureManager.EndTest(t) + + mongoDBContainer := fixtures.GetWithSkip(t, mongoContainerFixtureKey).(dbtest.MongoDBContainer) + + ssn, err := mongoDBContainer.NewSession() + require.Nil(t, err, "Could not connect to MongoDB") + defer ssn.Close() + + testDB := ssn.DB(testDBName) + + uconn := freqconn.UConnPair{ + Src: "1.1.1.1", + Dst: "2.2.2.2", + } + + err = testDB.C(constants.StrobesCollection).Insert(&freqconn.FreqConn{ + UConnPair: uconn, + ConnectionCount: testThreshold, + }) + require.Nil(t, err, "Could not populate freqConn with test data") + + freqNotifier := freqconn.NewStrobesNotifier(testDB, nil) + + incAmount := 10 + + for i := testThreshold + 1; i <= testThreshold+incAmount; i++ { + freqNotifier.ThresholdExceeded(uconn, i) + } + + var freqResult freqconn.FreqConn + err = testDB.C(constants.StrobesCollection).Find(&uconn).One(&freqResult) + require.Nil(t, err, "Could not check freqConn for new records after ThresholdExceeded was run") + + require.Equal(t, testThreshold+incAmount, freqResult.ConnectionCount, "Connection count incorrect after calling ThresholdExceeded") + require.Equal(t, uconn.Src, freqResult.Src) + require.Equal(t, uconn.Dst, freqResult.Dst) +} + +func TestStrobesWithAutoFlushCollection(t *testing.T) { + fixtures := fixtureManager.BeginTest(t) + defer fixtureManager.EndTest(t) + + mongoDBContainer := fixtures.GetWithSkip(t, mongoContainerFixtureKey).(dbtest.MongoDBContainer) + + ssn, err := mongoDBContainer.NewSession() + require.Nil(t, err, "Could not connect to MongoDB") + defer ssn.Close() + + testDB := ssn.DB(testDBName) + + srcIP := "1.1.1.1" + dstIP := "2.2.2.2" + + s := parsetypes.Conn{ + Source: srcIP, + Destination: dstIP, + } + bufferSize := 200 + flushTime := 2 * time.Second + + //Load an autoflush collection up with data (but not to the point where it will flush because the buffer is full) + autoFlushColl := buffered.NewAutoFlushCollection(testDB.C(constants.ConnCollection), int64(bufferSize), flushTime) + + errs := make(chan error, bufferSize) + autoFlushColl.StartAutoFlush(errs, func() { t.FailNow() }) + + for i := 0; i < bufferSize/2; i++ { + err = autoFlushColl.Insert(&s) + require.Nil(t, err, "Could not insert test data") + } + + //Run the ThresholdMet method. This should flush the auto flush collection. + //If it doesn't the collection will have records in it after we call ThresholdMet + //and wait for the deadline to pass + freqConnNotifier := freqconn.NewStrobesNotifier(testDB, autoFlushColl) + + err = freqConnNotifier.ThresholdMet(freqconn.UConnPair{ + Src: srcIP, + Dst: dstIP, + }, testThreshold) + + require.Nil(t, err, "Could not delete existing conn records or create a new freqConn record") + + //Wait for the auto flush deadline to pass + time.Sleep(flushTime) + + connCount, err := testDB.C(constants.ConnCollection).Count() + require.Nil(t, err, "Could not count how many records remain in conn collection") + require.Zero(t, connCount, "Matching records were not removed from the conn collection/ the auto flush buffer after ThresholdMet was ran") + + freqCount, err := testDB.C(constants.StrobesCollection).Count() + require.Nil(t, err, "Could not count how many records exist in freqConn collection") + require.Equal(t, 1, freqCount, "ThresholdMet did not create a single record in freqConn") + + var freqResult freqconn.FreqConn + err = testDB.C(constants.StrobesCollection).Find(nil).One(&freqResult) + require.Nil(t, err, "Could not check freqConn for new records after ThresholdMet was ran") + + require.Equal(t, srcIP, freqResult.Src, "Source IP in freqConn does not match the original address") + require.Equal(t, dstIP, freqResult.Dst, "Destination IP in freqConn does not match the original address") + require.Equal(t, testThreshold, freqResult.ConnectionCount, "Connection count in freqConn does not match the count passed to ThresholdMet") +} diff --git a/converter/output/rita/streaming/dates/main_test.go b/converter/output/rita/streaming/dates/main_test.go index 62991db..5706fad 100644 --- a/converter/output/rita/streaming/dates/main_test.go +++ b/converter/output/rita/streaming/dates/main_test.go @@ -61,8 +61,7 @@ var streamingRITATimeIntervalWriterFixture = integrationtest.TestFixture{ ritaWriter, err := dates.NewStreamingRITATimeIntervalWriter( env.GetOutputConfig().GetRITAConfig(), - internalNets, - bufferSize, autoFlushTime, + internalNets, bufferSize, autoFlushTime, intervalLengthMillis, gracePeriodCutoffMillis, clock, timezone, timeFormatString, env.Logger, diff --git a/converter/output/rita/streaming/dates/rita_dates.go b/converter/output/rita/streaming/dates/rita_dates.go index 45a710a..f57eff3 100644 --- a/converter/output/rita/streaming/dates/rita_dates.go +++ b/converter/output/rita/streaming/dates/rita_dates.go @@ -11,7 +11,6 @@ import ( "github.com/activecm/ipfix-rita/converter/logging" "github.com/activecm/ipfix-rita/converter/output" "github.com/activecm/ipfix-rita/converter/output/rita" - "github.com/activecm/ipfix-rita/converter/output/rita/buffered" "github.com/activecm/ipfix-rita/converter/stitching/session" "github.com/activecm/rita/parser/parsetypes" "github.com/benbjohnson/clock" @@ -19,21 +18,19 @@ import ( ) type streamingRITATimeIntervalWriter struct { - ritaDBManager rita.OutputDB + ritaDBManager rita.DBManager localNets []net.IPNet - collectionBufferSize int64 - autoflushDeadline time.Duration segmentTSFactory SegmentRelativeTimestampFactory gracePeriodCutoffMillis int64 timeFormatString string timezone *time.Location - clock clock.Clock - inGracePeriod bool - currentSegmentTS SegmentRelativeTimestamp - previousCollection *buffered.AutoFlushCollection - currentCollection *buffered.AutoFlushCollection - collectionMutex *sync.Mutex + clock clock.Clock + inGracePeriod bool + currentSegmentTS SegmentRelativeTimestamp + previousDB *rita.DB + currentDB *rita.DB + collectionMutex *sync.Mutex log logging.Logger } @@ -55,19 +52,17 @@ func NewStreamingRITATimeIntervalWriter(ritaConf config.RITA, localNets []net.IP gracePeriodCutoffMillis int64, clock clock.Clock, timezone *time.Location, timeFormatString string, log logging.Logger) (output.SessionWriter, error) { - db, err := rita.NewOutputDB(ritaConf) + db, err := rita.NewDBManager(ritaConf, bufferSize, autoFlushTime) if err != nil { return nil, errors.Wrap(err, "could not connect to RITA MongoDB") } return &streamingRITATimeIntervalWriter{ - ritaDBManager: db, - localNets: localNets, - collectionBufferSize: bufferSize, - autoflushDeadline: autoFlushTime, - segmentTSFactory: NewSegmentRelativeTimestampFactory(intervalLengthMillis, timezone), - timezone: timezone, - clock: clock, + ritaDBManager: db, + localNets: localNets, + segmentTSFactory: NewSegmentRelativeTimestampFactory(intervalLengthMillis, timezone), + timezone: timezone, + clock: clock, gracePeriodCutoffMillis: gracePeriodCutoffMillis, timeFormatString: timeFormatString, collectionMutex: new(sync.Mutex), @@ -75,29 +70,18 @@ func NewStreamingRITATimeIntervalWriter(ritaConf config.RITA, localNets []net.IP }, nil } -func (s *streamingRITATimeIntervalWriter) newAutoFlushCollection(unixTSMillis int64, - onFatal func(), autoFlushErrChan chan<- error) (*buffered.AutoFlushCollection, error) { +func (s *streamingRITATimeIntervalWriter) newRITADB(unixTSMillis int64, + onFatal func(), autoFlushErrChan chan<- error) (*rita.DB, error) { //time.Unix(seconds, nanoseconds) - //1000 milliseconds per second, 1000 nanosecodns to a microsecond. 1000 microseconds to a millisecond + //1000 milliseconds per second, 1000 nanoseconds to a microsecond. 1000 microseconds to a millisecond newTime := time.Unix(unixTSMillis/1000, (unixTSMillis%1000)*1000*1000).In(s.timezone) - newColl, err := s.ritaDBManager.NewRITAOutputConnection(newTime.Format(s.timeFormatString)) + newDB, err := s.ritaDBManager.NewRitaDB(newTime.Format(s.timeFormatString), autoFlushErrChan, onFatal) if err != nil { - return nil, errors.Wrapf(err, "failed to start auto flusher for collection XXX-%s.%s", newTime.Format(s.timeFormatString), rita.RitaConnInputCollection) + return nil, errors.Wrapf(err, "failed to create new rita database") } - err = s.ritaDBManager.EnsureMetaDBRecordExists(newColl.Database.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to start auto flusher for collection XXX-%s.%s", newTime.Format(s.timeFormatString), rita.RitaConnInputCollection) - } - - newAutoFlushCollection := buffered.NewAutoFlushCollection(newColl, s.collectionBufferSize, s.autoflushDeadline) - started := newAutoFlushCollection.StartAutoFlush(autoFlushErrChan, onFatal) - if !started { - errmsg := fmt.Sprintf("failed to start auto flusher for collection XXX-%s.%s", newTime.Format(s.timeFormatString), rita.RitaConnInputCollection) - return nil, errors.New(errmsg) - } - return newAutoFlushCollection, nil + return &newDB, nil } func (s *streamingRITATimeIntervalWriter) initializeCurrentSegmentAndGracePeriod( @@ -164,25 +148,32 @@ FlushLoop: if s.inGracePeriod { //Beginning of grace period, different time segment - //set previousCollection to currentCollection - s.previousCollection = s.currentCollection + //set previousDB to currentDB + s.previousDB = s.currentDB - //clear currentCollection so it is created when needed - s.currentCollection = nil + //clear currentDB so it is created when needed + //NOTE: we don't close the db as it can still be used as s.previousDB + s.currentDB = nil s.collectionMutex.Unlock() - } else if s.previousCollection != nil { + } else if s.previousDB != nil { //End of grace period, same segment //unlock the mutex immediately since we don't want to //hold the lock while we flush a buffer. - prevColl := s.previousCollection - s.previousCollection = nil + prevDB := s.previousDB + s.previousDB = nil s.collectionMutex.Unlock() - //Flush the previous collection and close it out - prevColl.Flush() - prevColl.Close() - err := s.ritaDBManager.MarkImportFinishedInMetaDB(prevColl.Database()) + //Close out the previous collection + //This will close the database sockets used by the object + err := prevDB.Close() + if err != nil { + errsOut <- err + break FlushLoop + } + + //Ensure the metadatabase is updated + err = prevDB.MarkFinished() if err != nil { errsOut <- err break FlushLoop @@ -201,9 +192,11 @@ FlushLoop: //This loop should only exit if there is an error (or the user shuts down the program) //wrap up the previous collection if it is open - if s.previousCollection != nil { - s.previousCollection.Flush() - s.previousCollection.Close() + if s.previousDB != nil { + err := s.previousDB.Close() + if err != nil { + errsOut <- err + } /* BUG: https://github.com/activecm/ipfix-rita/issues/35 err := s.ritaDBManager.MarkImportFinishedInMetaDB(s.previousCollection.Database()) @@ -214,9 +207,11 @@ FlushLoop: } //Wrap up the current collection - if s.currentCollection != nil { //could be nil due to error - s.currentCollection.Flush() - s.currentCollection.Close() + if s.currentDB != nil { //could be nil due to error + err := s.currentDB.Close() + if err != nil { + errsOut <- err + } /* BUG: https://github.com/activecm/ipfix-rita/issues/35 err := s.ritaDBManager.MarkImportFinishedInMetaDB(s.currentCollection.Database()) @@ -257,9 +252,9 @@ WriteLoop: var ritaConn parsetypes.Conn sess.ToRITAConn(&ritaConn, s.isIPLocal) - if s.currentCollection == nil { + if s.currentDB == nil { var err error - s.currentCollection, err = s.newAutoFlushCollection(s.currentSegmentTS.SegmentStartMillis, onFatal, errsOut) + s.currentDB, err = s.newRITADB(s.currentSegmentTS.SegmentStartMillis, onFatal, errsOut) if err != nil { errsOut <- errors.Wrap(err, "could not lazily initialize MongoDB output collection") break WriteLoop @@ -267,7 +262,7 @@ WriteLoop: } //Insert into today's db - err := s.currentCollection.Insert(ritaConn) + err := s.currentDB.InsertConnRecord(&ritaConn) if err != nil { errsOut <- errors.Wrap(err, "could not insert session into the current period collection") break WriteLoop @@ -276,11 +271,11 @@ WriteLoop: var ritaConn parsetypes.Conn sess.ToRITAConn(&ritaConn, s.isIPLocal) - if s.previousCollection == nil { + if s.previousDB == nil { prevTimeMillis := s.currentSegmentTS.SegmentStartMillis - s.currentSegmentTS.SegmentDurationMillis var err error - s.previousCollection, err = s.newAutoFlushCollection(prevTimeMillis, onFatal, errsOut) + s.previousDB, err = s.newRITADB(prevTimeMillis, onFatal, errsOut) if err != nil { errsOut <- errors.Wrap(err, "could not lazily initialize MongoDB output collection") break WriteLoop @@ -288,7 +283,7 @@ WriteLoop: } //Insert into yesterday's db - err := s.previousCollection.Insert(ritaConn) + err := s.previousDB.InsertConnRecord(&ritaConn) if err != nil { errsOut <- errors.Wrap(err, "could not insert session into the previous period collection") break WriteLoop diff --git a/converter/output/rita/streaming/dates/rita_dates_test.go b/converter/output/rita/streaming/dates/rita_dates_test.go index 6989d03..9e925e3 100644 --- a/converter/output/rita/streaming/dates/rita_dates_test.go +++ b/converter/output/rita/streaming/dates/rita_dates_test.go @@ -2,19 +2,20 @@ package dates_test import ( "fmt" - "testing" - "time" - "github.com/activecm/dbtest" "github.com/activecm/ipfix-rita/converter/environment" "github.com/activecm/ipfix-rita/converter/input" "github.com/activecm/ipfix-rita/converter/integrationtest" "github.com/activecm/ipfix-rita/converter/output" "github.com/activecm/ipfix-rita/converter/output/rita" + "github.com/activecm/ipfix-rita/converter/output/rita/constants" + "github.com/activecm/ipfix-rita/converter/output/rita/freqconn" "github.com/activecm/ipfix-rita/converter/stitching/session" "github.com/benbjohnson/clock" "github.com/globalsign/mgo/bson" "github.com/stretchr/testify/require" + "testing" + "time" ) func TestOutOfPeriodSessionsInGracePeriod(t *testing.T) { @@ -51,15 +52,15 @@ func TestOutOfPeriodSessionsInGracePeriod(t *testing.T) { prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) targetDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + targetDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, prevDBCount) - targetDBCount, err := ssn.DB(targetDBName).C(rita.RitaConnInputCollection).Count() + targetDBCount, err := ssn.DB(targetDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, targetDBCount) ssn.Close() @@ -103,15 +104,15 @@ func TestOutOfPeriodSessionsOutOfGracePeriod(t *testing.T) { prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) targetDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + targetDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, prevDBCount) - targetDBCount, err := ssn.DB(targetDBName).C(rita.RitaConnInputCollection).Count() + targetDBCount, err := ssn.DB(targetDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, targetDBCount) ssn.Close() @@ -149,12 +150,12 @@ func TestPreviousPeriodSessionsInGracePeriod(t *testing.T) { currDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + currDBTime.Format(timeFormatString) prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - fmt.Printf("Checking %s.%s", prevDBName, rita.RitaConnInputCollection) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + fmt.Printf("Checking %s.%s", prevDBName, constants.ConnCollection) + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), prevDBCount) @@ -203,15 +204,15 @@ func TestPreviousPeriodSessionsOutOfGracePeriod(t *testing.T) { prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) targetDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + targetDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, prevDBCount) - targetDBCount, err := ssn.DB(targetDBName).C(rita.RitaConnInputCollection).Count() + targetDBCount, err := ssn.DB(targetDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, targetDBCount) ssn.Close() @@ -251,15 +252,15 @@ func TestCurrentPeriodSessionsInGracePeriod(t *testing.T) { prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) targetDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + targetDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), currDBCount) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, prevDBCount) - targetDBCount, err := ssn.DB(targetDBName).C(rita.RitaConnInputCollection).Count() + targetDBCount, err := ssn.DB(targetDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), targetDBCount) ssn.Close() @@ -307,15 +308,15 @@ func TestCurrentPeriodSessionsOutOfGracePeriod(t *testing.T) { prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) targetDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + targetDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), currDBCount) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, prevDBCount) - targetDBCount, err := ssn.DB(targetDBName).C(rita.RitaConnInputCollection).Count() + targetDBCount, err := ssn.DB(targetDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), targetDBCount) ssn.Close() @@ -370,15 +371,15 @@ func TestGracePeriodFlip(t *testing.T) { //slow machines time.Sleep(waitTime) - prevDBCount, err := ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), prevDBCount) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - nextDBCount, err := ssn.DB(nextDBName).C(rita.RitaConnInputCollection).Count() + nextDBCount, err := ssn.DB(nextDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, nextDBCount) @@ -395,15 +396,15 @@ func TestGracePeriodFlip(t *testing.T) { //slow machines time.Sleep(waitTime) - prevDBCount, err = ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err = ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), prevDBCount) - currDBCount, err = ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err = ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, currDBCount) - nextDBCount, err = ssn.DB(nextDBName).C(rita.RitaConnInputCollection).Count() + nextDBCount, err = ssn.DB(nextDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, nextDBCount) @@ -423,15 +424,15 @@ func TestGracePeriodFlip(t *testing.T) { //slow machines time.Sleep(waitTime) - prevDBCount, err = ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err = ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), prevDBCount) - currDBCount, err = ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err = ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), currDBCount) - nextDBCount, err = ssn.DB(nextDBName).C(rita.RitaConnInputCollection).Count() + nextDBCount, err = ssn.DB(nextDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 0, nextDBCount) @@ -444,15 +445,15 @@ func TestGracePeriodFlip(t *testing.T) { //slow machines time.Sleep(waitTime) - prevDBCount, err = ssn.DB(prevDBName).C(rita.RitaConnInputCollection).Count() + prevDBCount, err = ssn.DB(prevDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), prevDBCount) - currDBCount, err = ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err = ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), currDBCount) - nextDBCount, err = ssn.DB(nextDBName).C(rita.RitaConnInputCollection).Count() + nextDBCount, err = ssn.DB(nextDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, int(bufferSize), nextDBCount) @@ -493,7 +494,7 @@ func TestBufferFlushOnClose(t *testing.T) { env := fixtures.Get(integrationtest.EnvironmentFixture.Key).(environment.Environment) currDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + currDBTime.Format(timeFormatString) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 4, currDBCount) ssn.Close() @@ -534,7 +535,7 @@ func TestBufferFlushOnTimeout(t *testing.T) { waitTime := 10 * time.Second time.Sleep(waitTime) - currDBCount, err := ssn.DB(currDBName).C(rita.RitaConnInputCollection).Count() + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() require.Nil(t, err) require.Equal(t, 4, currDBCount) ssn.Close() @@ -594,7 +595,7 @@ func TestMetaDBRecords(t *testing.T) { time.Sleep(waitTime) dbInfo := rita.DBMetaInfo{} - ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(rita.MetaDBDatabasesCollection).Find( + ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(constants.MetaDBDatabasesCollection).Find( bson.M{"name": prevDBName}, ).One(&dbInfo) @@ -610,7 +611,7 @@ func TestMetaDBRecords(t *testing.T) { time.Sleep(waitTime) dbInfo = rita.DBMetaInfo{} - ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(rita.MetaDBDatabasesCollection).Find( + ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(constants.MetaDBDatabasesCollection).Find( bson.M{"name": currDBName}, ).One(&dbInfo) @@ -622,7 +623,7 @@ func TestMetaDBRecords(t *testing.T) { time.Sleep(waitTime) dbInfo = rita.DBMetaInfo{} - ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(rita.MetaDBDatabasesCollection).Find( + ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(constants.MetaDBDatabasesCollection).Find( bson.M{"name": prevDBName}, ).One(&dbInfo) @@ -641,7 +642,7 @@ func TestMetaDBRecords(t *testing.T) { time.Sleep(waitTime) dbInfo = rita.DBMetaInfo{} - ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(rita.MetaDBDatabasesCollection).Find( + ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(constants.MetaDBDatabasesCollection).Find( bson.M{"name": nextDBName}, ).One(&dbInfo) @@ -653,7 +654,7 @@ func TestMetaDBRecords(t *testing.T) { time.Sleep(waitTime) dbInfo = rita.DBMetaInfo{} - ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(rita.MetaDBDatabasesCollection).Find( + ssn.DB(env.GetOutputConfig().GetRITAConfig().GetMetaDB()).C(constants.MetaDBDatabasesCollection).Find( bson.M{"name": currDBName}, ).One(&dbInfo) @@ -727,3 +728,108 @@ func generateNSessions(n int64, targetSessionEnd time.Time) []session.Aggregate } return sessions } + +func TestStrobes(t *testing.T) { + fixtures := fixtureManager.BeginTest(t) + defer fixtureManager.EndTest(t) + + env := fixtures.Get(integrationtest.EnvironmentFixture.Key).(environment.Environment) + strobeLimit := env.GetOutputConfig().GetRITAConfig().GetStrobe().GetConnectionLimit() + expectedConnCount := strobeLimit + 5 + + ritaWriter := fixtures.GetWithSkip(t, streamingRITATimeIntervalWriterFixture.Key).(output.SessionWriter) + //clock starts outside of the grace period + clock := fixtures.Get(clockFixture.Key).(*clock.Mock) + + //don't adjust clock so db names align with intervals + currDBTime := clock.Now().In(timezone) + prevDBTime := clock.Now().In(timezone).Add(-1 * time.Duration(intervalLengthMillis) * time.Millisecond) + targetDBTime := currDBTime + + //clock starts outside of the grace period + //This is the "mostly standard" setup + clock.Add(time.Duration(gracePeriodCutoffMillis) * time.Millisecond) + + sessionChan := make(chan *session.Aggregate, expectedConnCount) + + //Set the end time half way through the non grace period + targetDBTimeWithOffset := targetDBTime.Add(time.Duration(gracePeriodCutoffMillis/2) * time.Millisecond) + sourceIP := "1.1.1.1" + destIP := "2.2.2.2" + + sessions := make([]session.Aggregate, expectedConnCount) + + for i := 0; i < expectedConnCount; i++ { + var sessA session.Aggregate + var sessB session.Aggregate + + //create a mock flow + a := input.NewFlowMock() + b := &input.FlowMock{} + //set the targeted session end timestamp + a.MockFlowEndMilliseconds = targetDBTimeWithOffset.UnixNano() / 1000000 + a.MockFlowStartMilliseconds = a.MockFlowEndMilliseconds - 10*1000 + //set the ports so A will be the source. The port numbers are used to determine + //which flow originated the connection when the timestamps are the same + a.MockSourceIPAddress = sourceIP + a.MockDestinationIPAddress = destIP + a.MockSourcePort = 20000 + a.MockDestinationPort = 80 + //Fill out b + *b = *a + b.MockDestinationIPAddress = a.MockSourceIPAddress + b.MockSourceIPAddress = a.MockDestinationIPAddress + b.MockDestinationPort = a.MockSourcePort + b.MockSourcePort = a.MockDestinationPort + + session.FromFlow(a, &sessA) + session.FromFlow(b, &sessB) + sessA.Merge(&sessB) + + sessions[i] = sessA + } + + for i := range sessions { + sessionChan <- &sessions[i] + } + close(sessionChan) + + errs := ritaWriter.Write(sessionChan) + + mongoContainer := fixtures.GetWithSkip(t, mongoContainerFixtureKey).(dbtest.MongoDBContainer) + ssn, err := mongoContainer.NewSession() + if err != nil { + t.Fatal(err) + } + + for err = range errs { + t.Fatal(err) + } + + //ensure conn collections are empty in each possible db + currDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + currDBTime.Format(timeFormatString) + prevDBName := env.GetOutputConfig().GetRITAConfig().GetDBRoot() + "-" + prevDBTime.Format(timeFormatString) + + currDBCount, err := ssn.DB(currDBName).C(constants.ConnCollection).Count() + require.Nil(t, err) + require.Equal(t, 0, currDBCount) + + prevDBCount, err := ssn.DB(prevDBName).C(constants.ConnCollection).Count() + require.Nil(t, err) + require.Equal(t, 0, prevDBCount) + + //ensure freqconn is empty for previous db + prevDBFreqCount, err := ssn.DB(prevDBName).C(constants.StrobesCollection).Count() + require.Nil(t, err) + require.Equal(t, 0, prevDBFreqCount) + + //ensure freqconn is correct for current db + var freq freqconn.FreqConn + err = ssn.DB(currDBName).C(constants.StrobesCollection).Find(nil).One(&freq) + require.Nil(t, err) + require.Equal(t, sourceIP, freq.Src) + require.Equal(t, destIP, freq.Dst) + require.Equal(t, expectedConnCount, freq.ConnectionCount) + + ssn.Close() +} diff --git a/runtime/etc/converter/converter.yaml b/runtime/etc/converter/converter.yaml index 0d3be2c..94666f7 100644 --- a/runtime/etc/converter/converter.yaml +++ b/runtime/etc/converter/converter.yaml @@ -16,6 +16,20 @@ Output: # This database holds information about RITA managed databases. MetaDB: MetaDatabase + Strobe: + # This sets the maximum number of connections between any two given hosts that are stored. + # Connections above this limit will be deleted and not used in other analysis modules. This will + # also trigger an entry in the strobe module. A lower value will reduce analysis time and + # hide more potential false positives from other modules. A higher value will increase + # analysis time, increase false positives, but reduce the risk of false negatives. + # Recommended values for this setting are: + # 86400 - One connection every second for 24 hours + # 250000 - (Default) Good middle of the road value + # 700000 - Safe max value that is unlikely to cause errors + # The theoretical limit due to implementation limitations is ~1,048,573 + # but in practice timeouts have occurred at lower values. + ConnectionLimit: 250000 + Filtering: # These are filters that affect which flows are processed and which # are dropped.