MonsterDB – Streaming

Make your data beautiful again.

About Streaming

Streaming enables you to start with a collection or subset of a collection of documents and process the documents through in an efficient way performing various types of data enhancement along the process, these enhancements can vary, from filtering using standard selection criteria, geographical selection or fuzzy searching. Once you have made a selection it is possible to aggregate, cluster, classify or expand the data in any order and often in repeated steps in the stream. Data is only processed when it is needed to so rather like searching the web when we click on the next button information is processed on demand to be able to satisfy the users need, the same process occurs in streaming in monsterDB – if you need only the first 10 results then only 10 will be processed for you, obviously in the cases of some processes we may require the entire dataset to be processed ie aggregation or splitting but this will be done efficiently to enable the best use of memory and processing.

Aggregation: To perform aggregation to summarize, sort, bucket or otherwise group the data.

ExpandingIt is also possible to expand the data using relationships formed between documents such as parent child or ownership relationships in the data, this gives us the ability to expand the range of information to include family members, households, company groups etc into our focus. It is also possible to expand the stream by bringing in data from other collections using union operations or lookup (join type) operations.

Clustering: we can then use fuzzy matching to cluster the data into logical groups where there are some similarities before coercing them into conceptual business objects such as person, vendor, product etc using the information from the fuzzy matching and relationships to complete the picture for the concepts. 

Machine Learning: Also it is possible to run classification (machine learning algorithms) on the data to identify bayesian networks or decision trees based on the information in the stream, then it is possible to predict the actions or results on new data using those trees.

How a stream statement is formed:

stream.collection.step1() | step2() | ..... | stepN()

The steps in the statement can be any of the following reserved words:

match, find, limit, lookup, minus, skip, group, out, sort, bucket, fuzzy, unwind, filter, spinOut,  getRelated, rematch, analyze, classifierBuild, classifierPredict, classifierTree, arrf, coerce, compare, count, first, last, between, cluster, project, switch, union, empty, new, restGet and split.

Each stage in the stream will be executed in the order passed, the stages are responsible for processing as defined in the section below, but generally each stage will consume a set of documents and produce another smaller, larger or null set of documents. It is normal for the first stage to be a find (aka match) step which will identify the documents to initially be treated by the stream, however if this is omitted then the optimiser will assume the user wishes to inspect the whole collection. Generally it is better to filter out a subset to work from if the stream will need to perform a memory intensive process later on and then discard a subset of those results, however if you are not aggregating or splitting a stream based on any criteria then it should make little difference to the performance of the stream as results filtered by subsequent steps will not be extracted (ie the criteria will enable the stream to ignore the document).

Some stages will reduce the number of documents produced, for example “group”, “bucket”, “filter”, “limit”, “skip”, “first”, “last”, “between”, “count” and some may increase the number of documents produced for example “unwind”, “getRelated”, union and “switch”.

Terminal Operation (caching)

Stream processing does not need to have a terminal operation at all, if in fact one isn’t included then the result will always be directed to the screen or return the data in a DBCursor to the calling process, however there is a special operation “out” that will redirect the output to a collection causing the stream to be consumed and the result of the stream will be nothing.

Resuming after a Terminal Operation (cache-resume operation)

Although a terminal operation would remove all document from the stream, by processing them, in the same way an aggregation would reduce the number of documents available to process. In these situations you can refresh the stream from the same collection or a different collection, this is done using the switch stage, this enables you to create a new stream from the (filtered or unfiltered) contents of the specified collection.  There is no specific reason why this would need to be done in a stream as the same effect could be achieved by running a subsequent command, however it has been included as a way to provide a cache-resume step in a pipeline when storage of intermediate data is required, ie.

stream.collection.match(...) | step1() ... out("collection2") | switch("collection2") | step2() | out("collection3") ... etc


stream.gleif.match({}) | out("test") | switch({to: "test", filter: {_id: 'fbde2618-4724-49c6-aab7-ca5d8bfe4f23'}})

Example Data Used

Examples used in this page:

Collection: People

{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b7bbfd77-dce8-4a92-9b46-2ed2379eaf0a"}{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"dc2b4007-1d93-43bd-ac13-0385ab0bd79e"}{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b8db302d-6ba1-429d-9b6a-f50c198845c3"}{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"2c0bf70f-01d4-45a2-9d33-6b07357c5edf"}

Collection: Exchange


Initial Selection of records for a Stream

find (aka match) – deterministic selection

Format : find({query})

Follows the same usage pattern as the find(query) command in the collection definition.
A query has the form:

{ aList.columnName: "a value", numericColumn: {$gt: 0}, textColumn: /regex/, locationColumn: { $geoWithin: { ... }} }
{ $or: [ {...}, {...} ] }
{ $and: [ {...}, {...} ] }

The output produced will be a unique set of documents, a document will only be produced once. If the query is too specific no output will be produced, geospatial selectors can be used in the find stage to select documents based on their location:

stream.collection.find( { location: { $geoWithin :{type: "Polygon", coordinates: [ [ [ 41 , 5 ] , .... [ 41 ,  5 ] ] ] } }} | ...


If your first step in the stream is not a find (match) stage, then the entire collection will be used to start the stream. The same result can be made by using a find with an empty filter document:

stream.collection.find({}) | ...


fuzzy({Index: "indexname"})

Whilst similar to the find statement the $fuzzy stage will not use the standard indexes, it will only use the fuzzy indexes, this will allow it to find records that appear to have some similarity with the search query.
An example for this would be:

fuzzy({Entity:{LegalName:"INTERAGROS VP, a.s."}})

The optimiser will pass the query to the fuzzy interpreter that will take into account any rules that apply to the filter above and return any records that match it:

Pipeline stage to match using Index Name CompanyNameIdx2:


would yield 2 matches:


These results are indicating that there are two identical records matching the match rules and indicating to the user what the fuzzy match rules state should be the designated action to take with them, the _id field is the record that matched something and the _id field inside the Matches list is the _id field of the matched record or records.

RestGet calls a RESTful interface (web service) and it can read its configuration from the options passed or from the previous step, if it reads from the previous step, then the rest interface will be called exactly once for each document seen from the previous step. If the stream is terminated early, obviously it is lazy so the calls would stop.{url: 'http://localhost:8080/Rest/api/rest/fsc/byProfile/128817'}) | restGet()  ---- runs once for each document sent - in this case 1 time
stream.collection.find() | restGet() --- runs for each document sent to it that contains a url property
stream.collection.restGet({url: 'http://localhost:8080/Rest/api/rest/fsc/byProfile/128817'}) - runs once for this url

Creates a new singleton document in a stream, the document will be exactly as you specify in the parameter:{property: 1, property2: "two"})

creates an empty stream, or replaces the current stream with one.

stream.collection.find() | empty()

Due to the laziness of the processing, the above find would not run.


Reads a local text file, and creates a stream from it.

 stream.sets.readFile({"type": "csv", "filename": "file.csv", "delimiter": ",", "charset": "utf-8"})

The properties are:

  • type: csv or json
  • filename: the file on the local machine
  • For CSV:
  •    delimiter: the single character delimiter
  •    charset: the local character set
  •    quote: the single character quote

Expanding the stream

One of aspects of the monsterDB Stream is the ability to expand the stream to include data from other sources or data from relationships, this can be in the form of more documents or additional data (properties) onto the documents in the current stream.
This will append one output to another, the appendage will be specified by the parameter document, the properties of such ar with and filter, whereas filter is optional:
stream.collection.find({}) | union({with: "test", filter: {}})



Switches the output to another selection, be careful you should ensure the previous steps have a terminal operation, else you cannot guarantee the documents will be processed. 

stream.gleif.match({}) | out("test") | switch({to: "test"})
stream.gleif.match({}) | out("test") | switch({to: "test", filter: {_id: 'fbde2618-4724-49c6-aab7-ca5d8bfe4f23'}})
unwind({path: "fieldName"})

Whereas you may have an output the includes an array (list) within the document for example with the output from our earlier fuzzy search, the matches were produced in a list (Matches), if we want to extract the matches out and have pairs of records then we can use the unwind stage:

As an example, if the current stream contains data such as this with Matches as an array inside each document:

{"_id":"3","Matches":[{"score":100.0,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":  {"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""}}],"Count":1}

By adding a filter and unwind to the fuzzy:

find{"Entity.LegalForm.EntityLegalFormCode":"8888"}}, {"$fuzzy":{"Index":"CompanyNameIdx2"}}, {"$filter":{"Count":{"$gt":0}}}, {"$unwind":{"path":"Matches"}}]

Would now produce:


Rather like lookup, but the documents looked up will be the related documents as defined by the relationships in the collection, a selective filter can be applied to this.

  • relType – filters the relationship type
  • _idName – the name of the ID field in the related documents
  • into – the name of the list to add to the document
  • recurse – the level to recurse to
  • direction – the direction of travel 1 (down) or -1 (up)

This stage will look up a document in another (or the same) collection and include the matched document as a new object in the current record. In the event that the system fails to find a document in the looked up collection then the object will not be appended. This is akin to a relational outer join.

  • “localField” – is the field in the source table – in this case the one in the aggregate command,
  • “from” – signifies the collection to look into
  • “foreignField” is the value in the looked-up collection that we should match localField against and
  • “as” is the name of the values returned.

This stage will inner join to another (or the same) collection and an output where a match was found, the matched (to) document will be appended to the source as a new property in the current document. In the event that the system fails to find a document in the looked up collection then the document will not be returned.

join({"with":"Exchange","on":{Currency: $CURRID},"into":"ExchangeRateToday"}}
join({with: "sets", on: {"PRFL_NR": {$lower: $ProductID}}, as: "sets"})

with is the name of the collection, on is the expression to join and must be a document {}, the property names in the document must be the names of the properties in the joined collection (with) and the into tells us where to put the data returned.

Aggregating the Stream

bucket({groupBy: expression, boundaries: [0,1,2...], default: "", output: expression})

This stage will aggregate all incoming documents into buckets according to a numeric grouping.


groupBy is the expression used to bucket the values, in this case it would be a single property of the document but in reality could be an expression too.
boundaries is a list of numerics, the values are inclusive and a value such as 999 or 0 will appear together in this example,
default, for values that fall outside the intended ranges in the boundaries will be assigned to the default bucket. This is useful for outliers.
Output is the record that will be produced by each bucket, the _id field of with will be the value specified in the lower boundary. Each property of the output will be evaluated by the parser and thus can include expressions.
Output from this example would look like this:

group({id: expression, field: expression, field: expression})

Group is used to aggregate data by a set up dimensions in the document, the types of aggregation that is currently supported is:

$sum – summarise
$avg – average (mean)
$max – maximum numeric value
$min – minimum numeric value
$push – push value to a list (array)
$first – first item in a list
$last – last item in a list

Each of the operators above can be augmented by expressions.
For example to sum all the salaries in our base currency at the current exchange rate, by last name:


The use of the _id column is key here as it defines the aggregate dimension to use, in the example above it is LastName, but it could also be an expression:


Where the group by expression will be a year, extracted from the year of the startDate field. In our example data set:


You could also decide not to group by an expression, for example:


Would produce the non-dimensional output:


Reducing the Stream


Filter is used beyond that of the $match stage, as each document may now be augmented with new values from lookups etc we can now filter on the current document values.
As an example, if we add the following filter stage to the example in the bucket:


The effect on the output would be as follows:


The skip stage will ignore a certain numeric amount of documents, for this to have any particular use, the user is recommended to ensure the collection data that skip is applied to is in a logical order, else the skipped records would be in the natural order of the index used to recall the data from the collection, which is constant providing you don’t change anything.
An example is:

sort({“ROWID”:1}) | skip(1)

Using our data collection, this would produce the following output:

{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"16d39308-7f62-4414-a659-909623cb6f15"}
{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"2026e226-1165-427b-b784-ea450d04801f"}
{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"1d1f3717-28a2-4043-b0aa-26f6a1417d39"}

You will notice ROWID #1 is missing from our output.

sort({field: 1/-1, field: 1/-1....})

Sort will determine the output of the values to the next step of the pipeline, it can be used to sort on numeric and non-numeric values in the database. Values can be combined together into the sort document to create a composite sort:


This will sort ascending by lastname and descending by ROWID, yielding the following output:

{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"1225351c-b5b2-4e0a-93d3-fa51b5beb06f"}
{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"d3a77c1d-942a-4ff4-830b-98d4320c90eb"}
{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b4c7c07f-7b9f-49a0-927e-96f5a6fb6adf"}
{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b50ebba3-a08a-42e7-9ffb-04ff853847db"}

Limit is the opposite of skip, whereas it will limit the output to the first n values where n is an integer.

sort({"ROWID":1}) | limit(1)

Using the same example as skip but switching to limit will produce the following:

{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"fd43e30d-b564-4f4c-b4ae-3d9602025dc6"}

Using skip and limit together:
Depending on the order of the operations the results would be different. Skipping 1 and then limiting 1 will probably produce the 2nd record in the set, limiting 1 and then skipping 1 will produce and empty set as you would expect.


Distinct will reduce the stream removing duplicate documents, please be aware that any slight difference in the documents may cause the documents to differ, such as property order, in order to ensure the correct working of this, please preceed it with am evaluate statement to reduce the fields and guarantee the order.


Evaluate is a special command that enables you to restructure the data in the stream, this command is simple, it requires a document with a set of output properties (or rather what you want them to be named) and the corresponding values are complex objects of functions surrounding values or values themselves. To refer to a property of the document in the preceding step use the $propertyname syntax for the property value, there is no need to surround in quotes, this will be replaced with the value from the previous step at evaluation time. To refer to a function such as $upper, $eq, $contains, $multiply etc these are always used as the around the property values on the right hand side. See the section on operators

evaluate({PRFL_NR: $ProductID, Name: $ProductName, categories: {$popFirst: $fsc}})

Here we see the $ProductID is sustituted for PRFL_NR and categories will be the first array item from fsc.


Terminal Operations


Out is a largely terminal step in the aggregation pipeline as it does not produce any output. The usage of the out stage is to write the documents at the current stage to a collection in the current database.

[{"$group":{"_id":"$LastName","totalSalary":{"$sum":"$salary"},"totalPeople":{"$sum":1}}}, {"$out":"Summary"}]

Where the output to the next stage or the screen will be empty (size=0), however the table Summary will contain the following records:


writeFile will output the result of the stream to a JSON structured file, this file can then be read in by any useful program or using readFile with a type of JSON.

Classifier Commands

The monsterDB engine now utilises the WEKA libraries from the Machine Learning Group at the University of Waikato.

Full instructions on the WEKA clustering algorithms can be found at their website and on the internet. This page serves as the usage of the libraries as embedded into the monsterDB core, including the new browser output mode.


The output from the arrf command will be in the format of the json arrf standards.

  aggregate.aCollection.arrf({...defintion...}) | ...optional further command

The command is part of the aggregate pipeline function set and can be used in the tradional (ie mongodb) format or using the piped format that operating system users be be used to. Arrf is used to prepare a set of objects from a collection either this can be the whole collection as shown or a subset of a collection by proceeding the arrf with a match filter (or fuzzy filter).

  aggregate.aCollection.match({ ... filter ... }) | arrf({...defintion...}) | ... optional further command

Definition: this document object must be passed to arrf, it can be empty, but the document if populated must contain the types of the attributes you would like to use, normally if an attribute is not found in this document then it would be guessed by the arrf generator, however if you would like to force the generator to use nominal, string, numeric or date on any of the fields then this is how you would do this:

.... | arrf({name: "string", dob: "date yyyy-MM-dd'T'HH:mm:ss.SSSZ", age: "numeric", hair: "nominal"}) | ...

The date format is optional, but recommended

Nominal on its own as a string value will not seed the nominal values, if you wish to seed the nominal values with certain strings then it can be done like this:

.... | arrf({name: "string", dob: "date yyyy-MM-dd'T'HH:mm:ss.SSSZ", age: "numeric", hair: ["brown", "blonde", "red"] ) | ...

This is a seed of the nominal values if other values are found in the data then they will be appended to the list. This can be useful if your nominal values only end up creating a singleton list which is unusable by many classifiers (J48), you could add a dummy value to ensure at least 2 values are classified.


As the name suggests cluster build will take an arrf format output and build a model from the using a training set(s) and a testing set(s) in the same way it would be done in R or WEKA workbench.

Usage from CLI: 

An example of a clusterBuild is:

aggregate.gleif.match({}) | arrf() | classifierBuild({ "trees.J48": {numFolds: 10, className: "Entity.Solvency"} })

The example would take the output of the entire gleif collection, convert it to a json arrf format and then build a decision tree on the data using the J48 algorithm.

The optional, but highly recommended options are in a document where each entry pertains to an algorithm, this way you can produce multiple models from the same input set, the list of algorithm names available can be seen using the listClassifiers command. 

Each classifier has the following options:

numFolds – how many fold (randomised training sets) to generate from the source data.

className – the attribute that we are looking for a classification for – ie if we are aiming to predict the outcome of the solvency of the company then this field would be the className. Note the dot notation is because the data in the collection is nested, solvency appears under Entity.

classifierBuild will output a largely unreadable json document, you should store this as shown, the rest of the fields in the options will also be stored with the output, this means you can use an _id field to ensure that you only keep one version of the model:

aggregate.gleif.match({}) | arrf() | classifierBuild({ "trees.J48": {numFolds: 10, className: "Entity.Solvency", "_id": "model1"} }) | out("modelsCollection")

For the interested the model can be viewed using a standard find on the collection:

db.modelsCollection.find({_id: "model1"})

would produce something like:

{"numFolds":10,"_id":"models1","className":"Entity.Solvency","classifier":"trees.J48","accuracy":100,"createDate":"Mar 25, 2020 6:08:05 PM","model":{"m_storedObjectArray": [ ... ],"m_isCompressed":false,"m_isJython":false}}

Often it is key to visit the model to assess the accuracy value, which will be added after the training set is tested.


Now we have a model trained and stored in the database we can use it to predict an outcome from a new dataset, this is done using the classifierPredict aggregate function:
classifierPredict({"modelFilter": {"from": "collectionName", filter: { ... filter ... } } } )
classifierPredict({"numFolds":10,"_id":"models1","className":"Entity.Solvency","classifier":"trees.J48","accuracy":100,"createDate":"Mar 25, 2020 6:08:05 PM","model":{"m_storedObjectArray": [ ... ],"m_isCompressed":false,"m_isJython":false}})
As the output from the classifierBuild can not be piped into a classifierPredict (as the input is your data) then it will need to be told where to receive the trained model.
Using modelFilter:
This can be done by passing a modelFilter document containing two fields from and filter, from is the collection name in the same database and filter is in the format used by match and find in the database and can include complex arguments. The output must be a single document, subsequent documents will be ignored (first is used)
Passing the model on the command line:
If you wish then you can pass the document containing the model as per the output in the classifierBuild above.
An example is as follows:
aggregate.gleif.match({}) | arrf({})  | classifierPredict({"modelFilter": {"from": "modelCollection", filter: {_id: "model1"}}})


{... data ..., "Class":"SOLVENT"},

For each object passed to the classifierPredict it will be returned with an Class field appended, this is the prediction.


Classifier tree is used to produce human readable decision trees, not all classifiers in WEKA will work yet, but any that product dot format output should be fine.

aggregate.collection.classifierTree({"modelFilter": {"from": "collection name", filter: { .... }}})

Model filter uses from to select the collection in the same database and filter to select a single object in that collection (first used).


Lists the usable classifiers in the system:



{"value": "trees.HoeffdingTree"}, 
{"value": ""},
{"value": ""},
{"value": "trees.J48"},
{"value": "trees.M5P"},
{"value": "pmml.consumer.TreeModel"},
{"value": "meta.CostSensitiveClassifier"},
{"value": "meta.RandomizableFilteredClassifier"}, {"value": "meta.CVParameterSelection"},
{"value": "trees.RandomTree"},
{"value": "trees.REPTree"},
{"value": "trees.LMT"},
{"value": "meta.AttributeSelectedClassifier"},
{"value": "meta.FilteredClassifier"},
{"value": "bayes.BayesNet"},
{"value": "misc.InputMappedClassifier"},
{"value": ""}