September 14, 2021
Cribl Stream enables you to get multiple data formats into your analytics tools quickly. You can use Stream to ingest data from any observability source or REST API, and easily transform and route that data to whatever analytics tool you choose.
This blog will walk through how I used Stream to ingest 3D printer data from OctoPrint, using Stream’s REST and filesystem collectors. And how I then transformed and routed that data to OpenSearch. The basic principles are the same for RepetierServer or Ultimaker.
OctoPrint is a print server with a streamlined Web UI, designed for hobbyist 3D printer users. It gives a user the ability to upload their G-Code to their 3D printers remotely. It also allows you to do things like preheat your printer and get historical data on your prints.
On top of the other things that a hobbyist might be more interested in, it also has an extensive REST API. We can use the REST API to retrieve data on jobs, plugins, printer state, etc. We can also get the
octoprint.log, which is OctoPrint’s main application log. This log contains information on everything happening while OctoPrint is running.
We will examine how we can ingest printer State data via the API, and the OctoPrint logs via the filesystem. Then we will create a Pipeline to manipulate these logs to be ready for ingestion into OpenSearch. Finally, we will route these logs to OpenSearch, and search and visualize them there.
Using the Stream REST collector, we can get data from OctoPrint’s REST API. As previously stated, we are going to get the printer State using GET /api/printer.
We are going to add one last configuration under Fields (Metadata). We will use this added field in our filter to route this collected data to OpenSearch.
collector = 'OctoPrintState'
Before we set up the Filesystem collector to ingest the octoprint.log, we will need to set up an Event Breaker to properly break the events when collecting them in Stream.
OctoPrint Ruleset and click + Add Rule.
OctoPrintLog and, under Event Breaker Type, select Timestamp.
Click OK and save the Event Breaker.
Now that we have configured the Event Breaker, let’s set up the Filesystem collector to collect the
~/.octoprint/logs directory. Under Directory, enter the full path to these OctoPrint logs.
octofilesystem. We will use this field to filter these logs in the Route.
octoprint.log, which is now correctly broken. Go ahead and save this as a sample file to use in our Pipeline creation.
Now I will create a pipeline in Stream to process the OctoPrint state information and
octoprint.log, and get it ready for ingestion in OpenSearch.
To create a new pipeline, go to pipelines → + Pipeline. I named mine
The first thing we need to do to get our data ready to send to OpenSearch is to specify the index where we want our data. We can do this by setting the
__index field using an Eval function.
__index and the Value Expression to ‘
Let’s take a look at the sample data we saved, by clicking Preview Simple. If you click the gear icon and select Show Internal Fields, you can see the
__index field we just created. You might also notice that the
_raw field contains JSON. OpenSearch needs the nested fields extracted to use them. We can do this by using the Parser function.
The Parser has now extracted the fields out of
_raw, and they are ready to be sent to OpenSearch.
OpenSearch doesn’t recognize fields that start with _, and we don’t need that field anymore. We will get rid of
_raw with a final Eval function.
_raw is gone, the event is ready to send on to OpenSearch.
Let’s create another pipeline for the
octoprint.log we saved as a sample earlier. Open it on the right by clicking Preview > Simple on its row.
We can see that, as with the API state data, our data is in
_raw, and OpenSearch will complain about fields starting with an underscore. We can fix this by adding a Rename function, to rename the
_raw field to
Now we need to specify the OpenSearch index we want to send to, by setting the
__index under Name and Value Expression to
Now, this data is ready to be routed to OpenSearch.
OpenSearch is Elastic under the covers, and it works great with our Elasticsearch destination.
_bulk API of our OpenSearch instance. Type should be _doc, as that is what OpenSearch prefers.
octoprint. In the Pipelines we created, we overwrote the
Your settings should look something like this:
One last important setting when sending data to OpenSearch is the Elastic version.
Once we’ve configured the destination, we can set up a route to route the State data we collected with the REST API Collector to OpenSearch.
collector=='OctoPrintState', the field we added as metadata in our Collector configuration.
OctoPrint-API-State pipeline we created in a previous step.
OctoPrint-Filesystem Pipeline we created in a previous step.
OpenSearch needs minimal configuration, as we have done most of the work on the Stream side. Once we have sent data to OpenSearch, we can set up an index pattern to analyze the data. To create an index pattern:
octoprintstate, we should see that index show up in the list. Type
octoprintstate* as the index pattern name, and click next.
@timestamp in the timestamp dropdown and click create index pattern.
You should now see your index pattern with all of the fields listed because we extracted them in Stream previously.
@timestamp in the timestamp dropdown, and click create index pattern.
You can now go to Discover and search the data by filtering on the new index pattern
Are you interested in routing OctoPrint Logs, or any other data, to OpenSearch? It’s easy to get started on using Stream to route and transform your logs, by signing up for Cribl.Cloud and instantly provisioning an instance of Stream Cloud. Once connected to data sources, you can process up to 1TB per day for free!