Skip to content
This repository was archived by the owner on Jan 29, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions converter/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion converter/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

[[constraint]]
name = "github.com/activecm/rita"
version = "^1.0.3"
version = "^2.0.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been updated earlier, but it the change was needed now as we have to get the parsetypes.Freq object out of RITA to get the needed MongoDB indexes for the freqConn collection


[[constraint]]
name = "github.com/activecm/mgosec"
Expand Down
4 changes: 2 additions & 2 deletions converter/commands/check_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package commands

import (
"fmt"

"github.com/activecm/ipfix-rita/converter/config"
"github.com/activecm/ipfix-rita/converter/config/yaml"
"github.com/activecm/ipfix-rita/converter/input/logstash/mongodb"
"github.com/activecm/ipfix-rita/converter/output/rita"
"github.com/urfave/cli"
"time"
)

func init() {
Expand Down Expand Up @@ -53,7 +53,7 @@ func init() {
fmt.Printf("Found %d Flow Records Ready For Processing\n", count)
coll.Database.Session.Close()

outDB, err := rita.NewOutputDB(conf.GetOutputConfig().GetRITAConfig())
outDB, err := rita.NewDBManager(conf.GetOutputConfig().GetRITAConfig(), 100, 1*time.Second)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to add some dummy values to make the constructor happy.

if err != nil {
return cli.NewExitError(fmt.Sprintf("%+v\n", err), 1)
}
Expand Down
7 changes: 7 additions & 0 deletions converter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type RITA interface {
GetConnectionConfig() MongoDBConnection
GetDBRoot() string
GetMetaDB() string
GetStrobe() Strobe
}

//Strobe contains configuration for populating the
//freqConn collection / Strobes analysis in RITA
type Strobe interface {
GetConnectionLimit() int
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally had to touch a few files to implement the ConnectionLimit config option for strobes as in RITA.

Changed:
converter/config/config.go
converter/config/yaml/output.go
converter/integrationtest/config.go
converter/etc/config.yaml
runtime/etc/converter/converter.yaml


//Filtering contains information on local subnets and other networks/hosts
Expand Down
14 changes: 14 additions & 0 deletions converter/config/yaml/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ritaMongoDB struct {
MongoDB mongoDBConnection `yaml:"MongoDB-Connection"`
DBRoot string `yaml:"DBRoot"`
MetaDB string `yaml:"MetaDB"`
Strobe strobe `yaml:"Strobe"`
}

func (r *ritaMongoDB) GetConnectionConfig() config.MongoDBConnection {
Expand All @@ -29,3 +30,16 @@ func (r *ritaMongoDB) GetDBRoot() string {
func (r *ritaMongoDB) GetMetaDB() string {
return r.MetaDB
}

func (r *ritaMongoDB) GetStrobe() config.Strobe {
return &r.Strobe
}

//strobe implements config.Strobe
type strobe struct {
ConnectionLimit int `yaml:"ConnectionLimit"`
}

func (s *strobe) GetConnectionLimit() int {
return s.ConnectionLimit
}
15 changes: 15 additions & 0 deletions converter/etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ Output:
# This database holds information about RITA managed databases.
MetaDB: MetaDatabase

Strobe:
# This sets the maximum number of connections between any two given hosts that are stored.
# Connections above this limit will be deleted and not used in other analysis modules. This will
# also trigger an entry in the strobe module. A lower value will reduce analysis time and
# hide more potential false positives from other modules. A higher value will increase
# analysis time, increase false positives, but reduce the risk of false negatives.
# Recommended values for this setting are:
# 86400 - One connection every second for 24 hours
# 250000 - (Default) Good middle of the road value
# 700000 - Safe max value that is unlikely to cause errors
# The theoretical limit due to implementation limitations is ~1,048,573
# but in practice timeouts have occurred at lower values.
ConnectionLimit: 250000


Filtering:
# These are filters that affect which flows are processed and which
# are dropped.
Expand Down
13 changes: 11 additions & 2 deletions converter/integrationtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,21 @@ func (t *OutputConfig) GetRITAConfig() config.RITA { return &t.rita }
//RitaConfig implements config.RITA
type RitaConfig struct {
mongoDB MongoDBConfig
strobe StrobeConfig
}

func (r *RitaConfig) GetConnectionConfig() config.MongoDBConnection { return &r.mongoDB }

func (r *RitaConfig) GetDBRoot() string { return "RITA" }
func (r *RitaConfig) GetMetaDB() string { return "MetaDatabase" }
func (r *RitaConfig) GetDBRoot() string { return "RITA" }
func (r *RitaConfig) GetMetaDB() string { return "MetaDatabase" }
func (r *RitaConfig) GetStrobe() config.Strobe { return &r.strobe }

type StrobeConfig struct{}

func (s *StrobeConfig) GetConnectionLimit() int {
//Lowered from the usual 250000 so tests involving this limit don't take forever.
return 100
}

//FilteringConfig implements config.Filtering
type FilteringConfig struct{}
Expand Down
72 changes: 29 additions & 43 deletions converter/output/rita/batch/dates/rita_dates.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/activecm/ipfix-rita/converter/logging"
"github.com/activecm/ipfix-rita/converter/output"
"github.com/activecm/ipfix-rita/converter/output/rita"
"github.com/activecm/ipfix-rita/converter/output/rita/buffered"
"github.com/activecm/ipfix-rita/converter/stitching/session"
"github.com/activecm/rita/parser/parsetypes"
"github.com/pkg/errors"
Expand All @@ -23,14 +22,12 @@ import (
//before being sent to MongoDB. The buffers are flushed when
//they are full or after a deadline passes for the individual buffer.
type batchRITAConnDateWriter struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch writer has to be activated with a cli flag.
It sends each connection to a timestamp based database and marks them ready for import with ipfix-rita closes. It does not do any database rotation. It has been updated to support strobes via the rita.DB class.

Largely references to the AutoFlushCollection directly tied to the conn collection have been replaced with calls to rita.DB

db rita.OutputDB
localNets []net.IPNet
outputCollections map[string]*buffered.AutoFlushCollection
bufferSize int64
autoFlushTime time.Duration
autoFlushContext context.Context
autoFlushOnFatal func()
log logging.Logger
db rita.DBManager
localNets []net.IPNet
outputDBs map[string]rita.DB
autoFlushContext context.Context
autoFlushOnFatal func()
log logging.Logger
}

//NewBatchRITAConnDateWriter creates an buffered RITA compatible writer
Expand All @@ -40,22 +37,20 @@ type batchRITAConnDateWriter struct {
//when the buffer is full or after a deadline passes.
func NewBatchRITAConnDateWriter(ritaConf config.RITA, localNets []net.IPNet,
bufferSize int64, autoFlushTime time.Duration, log logging.Logger) (output.SessionWriter, error) {
db, err := rita.NewOutputDB(ritaConf)
db, err := rita.NewDBManager(ritaConf, bufferSize, autoFlushTime)
if err != nil {
return nil, errors.Wrap(err, "could not connect to RITA MongoDB")
}

autoFlushContext, autoFlushOnFail := context.WithCancel(context.Background())
//return the new writer
return &batchRITAConnDateWriter{
db: db,
localNets: localNets,
outputCollections: make(map[string]*buffered.AutoFlushCollection),
bufferSize: bufferSize,
autoFlushTime: autoFlushTime,
autoFlushContext: autoFlushContext,
autoFlushOnFatal: autoFlushOnFail,
log: log,
db: db,
localNets: localNets,
outputDBs: make(map[string]rita.DB),
autoFlushContext: autoFlushContext,
autoFlushOnFatal: autoFlushOnFail,
log: log,
}, nil
}

Expand Down Expand Up @@ -90,14 +85,14 @@ func (r *batchRITAConnDateWriter) Write(sessions <-chan *session.Aggregate) <-ch
sess.ToRITAConn(&connRecord, r.isIPLocal)

//create/ get the buffered output collection
outColl, err := r.getConnCollectionForSession(sess, errs, r.autoFlushOnFatal)
outDB, err := r.getDBForSession(sess, errs, r.autoFlushOnFatal)
if err != nil {
errs <- err
break WriteLoop
}

//insert the record
err = outColl.Insert(connRecord)
err = outDB.InsertConnRecord(&connRecord)
if err != nil {
errs <- err
break WriteLoop
Expand All @@ -109,11 +104,13 @@ func (r *batchRITAConnDateWriter) Write(sessions <-chan *session.Aggregate) <-ch
}

func (r *batchRITAConnDateWriter) closeDBSessions(errs chan<- error) {
for i := range r.outputCollections {
r.outputCollections[i].Close()
for i := range r.outputDBs {
err := r.outputDBs[i].Close()
if err != nil {
errs <- err
}

err := r.db.MarkImportFinishedInMetaDB(r.outputCollections[i].Database())
//stops outputCollections from sending on errs
err = r.outputDBs[i].MarkFinished()
if err != nil {
errs <- err
}
Expand All @@ -132,39 +129,28 @@ func (r *batchRITAConnDateWriter) isIPLocal(ipAddrStr string) bool {
return false
}

func (r *batchRITAConnDateWriter) getConnCollectionForSession(sess *session.Aggregate,
autoFlushAsyncErrChan chan<- error, autoFlushOnFatal func()) (*buffered.AutoFlushCollection, error) {
func (r *batchRITAConnDateWriter) getDBForSession(sess *session.Aggregate,
autoFlushAsyncErrChan chan<- error, autoFlushOnFatal func()) (rita.DB, error) {

//get the latest flowEnd time
endTimeMilliseconds := sess.FlowEndMilliseconds()
//time.Unix(seconds, nanoseconds)
//1000 milliseconds per second, 1000 nanosecodns to a microsecond. 1000 microseconds to a millisecond
//1000 milliseconds per second, 1000 nanoseconds to a microsecond. 1000 microseconds to a millisecond
endTime := time.Unix(endTimeMilliseconds/1000, (endTimeMilliseconds%1000)*1000*1000)
endTimeStr := endTime.Format("2006-01-02")

//cache the database connection
outBufferedColl, ok := r.outputCollections[endTimeStr]
outDB, ok := r.outputDBs[endTimeStr]
if !ok {
//connect to the db
var err error
outColl, err := r.db.NewRITAOutputConnection(endTimeStr)
outDB, err := r.db.NewRitaDB(endTimeStr, autoFlushAsyncErrChan, autoFlushOnFatal)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewRITADB encapsulates the removed logic.

if err != nil {
return nil, errors.Wrapf(err, "could not connect to output database for suffix: %s", endTimeStr)
return outDB, errors.Wrapf(err, "could not create output database for suffix: %s", endTimeStr)
}

//create the meta db record
err = r.db.EnsureMetaDBRecordExists(outColl.Database.Name)
if err != nil {
outColl.Database.Session.Close()
return nil, err
}

//create the output buffer
outBufferedColl = buffered.NewAutoFlushCollection(outColl, r.bufferSize, r.autoFlushTime)
outBufferedColl.StartAutoFlush(autoFlushAsyncErrChan, autoFlushOnFatal)

//cache the result
r.outputCollections[endTimeStr] = outBufferedColl
r.outputDBs[endTimeStr] = outDB
}
return outBufferedColl, nil
return outDB, nil
}
7 changes: 5 additions & 2 deletions converter/output/rita/buffered/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package buffered
import (
"sync"

"github.com/pkg/errors"
mgo "github.com/globalsign/mgo"
"github.com/pkg/errors"
)

//Collection wraps an *mgo.Collection in order
Expand All @@ -16,7 +16,10 @@ type Collection struct {
}

//InitializeCollection wraps a *mgo.Collection with a buffer of a given size
//for performing buffered insertions.
//for performing buffered insertions. Note the Collection.Close() method
//closes the socket used by the collection handle. You may want to
//copy the initial connection before passing the handle to this
//constructor.
func InitializeCollection(coll *Collection, mgoCollection *mgo.Collection, bufferSize int64) {
coll.mgoCollection = mgoCollection
coll.buffer = make([]interface{}, 0, bufferSize)
Expand Down
16 changes: 16 additions & 0 deletions converter/output/rita/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package constants
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the strobes code is separate from the rita db manager code, and both sets of code needed access to these constants, a new package was created to hold them. This prevents import cycles.


//MetaDBDatabasesCollection is the name of the RITA collection
//in the RITA MetaDB that keeps track of RITA managed databases
const MetaDBDatabasesCollection = "databases"

//StrobesCollection contains the name for the RITA freqConn MongoDB collection
const StrobesCollection = "freqConn"

//ConnCollection contains the name for the RITA conn MongoDB collection
const ConnCollection = "conn"

// Version specifies which RITA DB schema the resulting data matches
var Version = "v2.0.0+ActiveCM-IPFIX"

// TODO: Use version in RITA as dep
Loading