x
AdobeStock_191033974-2

API Scraping Using Cribl And Setting Up a Notification Assistant

December 18, 2023

Cribl Stream is awesome at routing your server logs and making your job easier, but could it help you outside of work and potentially make your personal life easier? The short answer is: Yes. I’ve personally used Stream to build a notification system to inform me when certain products go on sale or when fully booked appointments become available.

In this blog, I’m going to take this a step further and show you how to:

  • Scrape the web for new house listings in your area
  • Send this data to S3 for storage
  • Use Cribl Search to query the data
  • Get notified via text message when a house matching your specific criteria goes on the market

Setting Up the Source

We will use RapidAPI’s realtor endpoint to get the house listing information we will feed into Stream. After signing up for a free RapidAPI account and getting your API key, you can set up the following REST Collector Source:

Note that you would need to use a JSON ARRAY event breaker, and you can change the Collect POST Body within the Collector to look for the house listings in your area. Refer to the RapidAPI docs for more info. I’ve also scheduled this job to query for new listings every 3 hours. The JSON config of this source is in the Appendix.

Setting Up the Routes

Once you have verified the Collector is working and you’re accessing the house listings via Stream, you can set up the Routes and transform that data. I currently have 2 Routes: one to send data to S3 and the other to set up my notifications.

These Routes are pretty standard. The data from the house listings from the Collector will first go through the notification assistant Route (Realtor Notification) and then the S3 Route (Realtor S3). Note that the Final flag is not set on the first Route, meaning the same data goes through both Routes.

Setting Up the S3 Pipeline and the Redis Database

The Collector is set up to get listings every 3 hours. This means that we are going to receive a lot of repetitive listings every 3 hours. To combat that, I’m using a Cloud Redis Database (an on-prem Redis DB could also be used) to save the property ID and the listing price. This way, every time the Collector is run, the house listing will be checked against the Redis database. If the property ID exists in the Redis DB, and the listing price remains what it used to be, that event will be dropped. This way, only new listings, or listings with a new listing price, will make it to S3. Now, let’s take a look at the Pipeline:

The first two Functions will only parse _raw and the description field. You can parse/remove other fields, depending on your use case and what you need to keep track of.

The third Function checks whether the property_id exists within Redis and will either return a value if it exists or return a null. We are setting that value to redis_listing_price.

In the following Function, we drop any event with the current list_price equal to the one saved within Redis. This means only listings that make it through are the new ones or the ones with a changed list price.

In the fifth Function, we add {property_id:list_price} to the Redis DB to keep track of it.

Finally, I have an Eval Function to remove fields I don’t care about.

Setting up the Notification Assistant Pipeline and Destination

The goal of the notification assistant is to send an SMS text message when a house with your criteria goes on the market. To do that, I have set up the following Pipeline:

The first two Parser Functions are identical to the ones in the S3 Pipeline, just parsing the _raw and description fields.

The third function will drop any listings not meeting the filter criteria. In this scenario, we only look for single-family or townhomes between $400k–$500k and 1400–1500sqft.

The fourth Function is checking if this property exists within the Redis DB. In the fifth Function, the listing will be dropped if the property already exists within Redis and its price hasn’t changed.

In the last Function, we are transforming the whole event into the syntax our SMS service provider requires since this is what is sent out of Cribl. In this scenario, I’m using Twilio, and after looking through their docs, I found that we would need to use the above syntax.

Note that you could send this data anywhere using the Webhook Destination, so you’re not limited to Twilio. The following is my Twilio Destination:

After setting up your Twilio trial account, you are given a phone number, SID, and access key. Using this data, you can set up the above Destination.

And voilà, once a house matching your criteria comes to the market, you will get a text message.

S3 Destination

To store the data cheaply and to be able to query it using Cribl Search, I have set up an S3 Destination to receive and export all the listings:

I won’t review the AWS S3 setup, but here is my Cribl Destination config. I’m having listings go to the <amproject> bucket, and the partitioning expression causes the bucket to save the data in the following partition: /realtor.com/YEAR/MONTH/DATE/HOUR

Note that you’re not limited to only AWS S3. You could use Azure Blob Storage or Google Cloud Storage and query the data using Cribl Search.

Utilizing Search

Now that this data is going to S3, you can use Cribl Search to query that data. You must set up the database and the S3 connection within Search.

Firstly, within Data > Dataset Providers, you need to set up the connection between Search and S3.

Next, within Data > Datasets, you need to tell Search about the data you want to query. You need to link your dataset provider, bucket path, and region. In the Processing left tab, you need to tell Search how to read your data and what the format of your data is.

After you set up the datasets and the datatypes, you can run some searches!!!

I’ve been running this only for a limited time and only for the Madison, WI area to help me find a home. But this could be set up to run in multiple different markets for much longer. Cribl makes this process very automated and customizable so that the same can be done with any other dataset. We offer free and instant access to Cribl Search over Cribl.Cloud – what are you waiting for?

APPENDIX:

Collector Source Config:

{

"type": "collection",

"ttl": "4h",

"removeFields": [],

"resumeOnBoot": false,

"schedule": {

"cronSchedule": "0 2-23/3 * * *",

"maxConcurrentRuns": 1,

"skippable": true,

"run": {

"rescheduleDroppedTasks": true,

"maxTaskReschedule": 1,

"logLevel": "info",

"jobTimeout": "0",

"mode": "run",

"timeRangeType": "relative",

"timestampTimezone": "UTC",

"expression": "true",

"minTaskSize": "1MB",

"maxTaskSize": "10MB"

},

"enabled": true

},

"streamtags": [],

"workerAffinity": false,

"collector": {

"conf": {

"discovery": {

"discoverType": "none"

},

"collectMethod": "post_with_body",

"pagination": {

"type": "none"

},

"authentication": "none",

"timeout": 0,

"useRoundRobinDns": false,

"disableTimeFilter": false,

"rejectUnauthorized": false,

"safeHeaders": [],

"retryRules": {

"type": "backoff",

"interval": 1000,

"limit": 5,

"multiplier": 2,

"codes": [

429,

503

],

"enableHeader": true

},

"collectUrl": "\"https://realtor.p.rapidapi.com/properties/v3/list\"",

"collectRequestHeaders": [

{

"name": "X-RapidAPI-Host",

"value": "\"realtor.p.rapidapi.com\""

},

{

"name": "X-RapidAPI-Key",

"value": "<API_KEY_FROM_RAPID_API"

},

{

"name": "content-type",

"value": "\"application/json\""

}

],

"collectBody": "{\n limit: 200,\n offset: 0,\n postal_code: '53703',\n status: [\n 'for_sale',\n 'ready_to_build'\n ],\n sort: {\n direction: 'desc',\n field: 'list_date'\n }\n }"

},

"destructive": false,

"type": "rest"

},

"input": {

"type": "collection",

"staleChannelFlushMs": 10000,

"sendToRoutes": true,

"preprocess": {

"disabled": true

},

"throttleRatePerSec": "0",

"breakerRulesets": [

"JSON ARRay"

]

},

"savedState": {},

"id": "Realtor"

}

Realtor_Pipeline for Routing Data to S3:

{

"id": "Realtor_Pipeline",

"conf": {

"output": "default",

"streamtags": [],

"groups": {},

"asyncFuncTimeout": 1000,

"functions": [

{

"filter": "true",

"conf": {

"mode": "extract",

"type": "json",

"srcField": "_raw",

"keep": [],

"remove": [

"__typename"

],

"fieldFilterExpr": "value !== null"

},

"id": "serde",

"description": "Parse out the raw field"

},

{

"filter": "true",

"conf": {

"mode": "extract",

"type": "json",

"srcField": "description",

"remove": []

},

"id": "serde",

"description": "Parse out the description field"

},

{

"filter": "true",

"conf": {

"commands": [

{

"command": "get",

"keyExpr": "property_id",

"outField": "redis_listing_price"

}

],

"deploymentType": "standalone",

"authType": "none",

"maxBlockSecs": 60,

"tlsOptions": {

"rejectUnauthorized": true

},

"url": "<YOUR_REDIS_URL>"

},

"id": "redis",

"disabled": false,

"description": "Within redis, I'm saving {property_id:list_price} in order to keep track of the list price change. This function will get the last list price of a specific property. If it's a new listing, this will return null"

},

{

"filter": "redis_listing_price == list_price",

"conf": {},

"id": "drop",

"description": "Dropping events where there were no changes in listing price since the last query"

},

{

"filter": "true",

"conf": {

"commands": [

{

"command": "set",

"keyExpr": "property_id",

"argsExpr": "list_price"

}

],

"deploymentType": "standalone",

"authType": "none",

"maxBlockSecs": 60,

"tlsOptions": {

"rejectUnauthorized": true

},

"url": "<YOUR_REDIS_URL>"

},

"id": "redis",

"disabled": false,

"description": "If there was a change in the list price, we are saving the new list price to redis"

},

{

"filter": "true",

"conf": {

"remove": [

"host",

"cribl_breaker",

"cribl_pipe",

"description"

]

},

"id": "eval",

"description": "Removing certain fields that we dont care about"

}

]

}

}

Realtor_Notification Pipeline for Routing Data to Twilio (SMS Vendor):

{

"id": "Realtor_Notification",

"conf": {

"output": "default",

"streamtags": [],

"groups": {},

"asyncFuncTimeout": 1000,

"functions": [

{

"id": "serde",

"filter": "true",

"disabled": false,

"conf": {

"mode": "extract",

"type": "json",

"srcField": "_raw",

"remove": [

"__typename"

],

"fieldFilterExpr": "value !== null"

},

"description": "Parsing _raw field"

},

{

"id": "serde",

"filter": "true",

"disabled": false,

"conf": {

"mode": "extract",

"type": "json",

"srcField": "description"

},

"description": "Parsing description field"

},

{

"conf": {},

"id": "drop",

"filter": "!((type == \"single_family\" || type == \"townhomes\") && list_price > 400000 && list_price < 500000 && sqft > 1400 && sqft < 1500)",

"disabled": false,

"description": "Only looking for single family or townhomes between the $400k & $500k price and within 1400-1500 sqft. Dropping any other listings"

},

{

"filter": "true",

"conf": {

"commands": [

{

"outField": "redis_listing_price",

"command": "get",

"keyExpr": "property_id"

}

],

"deploymentType": "standalone",

"authType": "none",

"maxBlockSecs": 60,

"tlsOptions": {

"rejectUnauthorized": true

},

"url": "<YOUR_REDIS_URL>"

},

"id": "redis",

"disabled": false,

"description": "checking whether I have already came across this property"

},

{

"conf": {},

"id": "drop",

"filter": "redis_listing_price == list_price",

"disabled": false,

"description": "If I have come across this property and the price is still the same, drop it"

},

{

"filter": "true",

"conf": {

"keep": [

"_raw"

],

"remove": [

"*"

],

"add": [

{

"disabled": false,

"name": "_raw",

"value": "`Body=New home hit the market: ${href}&From=<TWILIO_PHONE_NUMBER>&To=<YOUR_PHONE_NUMBER>`"

}

]

},

"id": "eval",

"disabled": false

}

]

}

}

Twilio Destination:

{

"systemFields": [

"cribl_pipe"

],

"streamtags": [],

"method": "POST",

"format": "custom",

"keepAlive": true,

"concurrency": 5,

"maxPayloadSizeKB": 4096,

"maxPayloadEvents": 1,

"compress": false,

"rejectUnauthorized": true,

"timeoutSec": 60,

"flushPeriodSec": 1,

"useRoundRobinDns": false,

"failedRequestLoggingMode": "none",

"safeHeaders": [],

"onBackpressure": "block",

"authType": "basic",

"tls": {

"disabled": true

},

"customSourceExpression": "_raw",

"customDropWhenNull": false,

"customEventDelimiter": "\\n",

"customContentType": "application/x-www-form-urlencoded",

"customPayloadExpression": "`${events}`",

"id": "twilio",

"type": "webhook",

"url": "https://api.twilio.com/2010-04-01/Accounts/<TWILIO_ACCOUNT_SID>/Messages.json",

"username": "<TWILIO_ACCOUNT_SID>",

"password": "<TWILIO_SECRET_KEY>"

}

S3 Destination:

{

"systemFields": [],

"streamtags": [],

"awsAuthenticationMethod": "manual",

"signatureVersion": "v4",

"reuseConnections": true,

"rejectUnauthorized": true,

"enableAssumeRole": false,

"durationSeconds": 3600,

"stagePath": "$CRIBL_HOME/state/outputs/staging",

"addIdToStagePath": true,

"objectACL": "private",

"removeEmptyDirs": true,

"partitionExpr": "C.Time.strftime(_time ? _time : Date.now()/1000, '%Y/%m/%d/%H')",

"format": "json",

"baseFileName": "`CriblOut`",

"fileNameSuffix": "`.${C.env[\"CRIBL_WORKER_ID\"]}.${__format}${__compression === \"gzip\" ? \".gz\" : \"\"}`",

"maxFileSizeMB": 32,

"maxOpenFiles": 100,

"onBackpressure": "block",

"maxFileOpenTimeSec": 300,

"maxFileIdleTimeSec": 30,

"maxConcurrentFileParts": 4,

"verifyPermissions": true,

"compress": "none",

"emptyDirCleanupSec": 300,

"id": "Realtor",

"type": "s3",

"bucket": "amproject",

"destPath": "\"realtor.com\"",

"region": "us-west-2",

"awsApiKey": "<ACCESS_KEY>",

"awsSecretKey": "<SECRET_KEY>"

}

 


 

Cribl, the Data Engine for IT and Security, empowers organizations to transform their data strategy. Customers use Cribl’s suite of products to collect, process, route, and analyze all IT and security data, delivering the flexibility, choice, and control required to adapt to their ever-changing needs.

We offer free training, certifications, and a free tier across our products. Our community Slack features Cribl engineers, partners, and customers who can answer your questions as you get started and continue to build and evolve. We also offer a variety of hands-on Sandboxes for those interested in how companies globally leverage our products for their data challenges.

.
Blog
Feature Image

The Evolution of Data Archiving: How to Get Immediate Access to Archived Data

Read More
.
Blog
Feature Image

The Stream Life Podcast Episode 105: Exploring Cribl Copilot!

Read More
.
Blog
Cribl Copilot

Cribl Copilot: Your Trusted AI Wingman for Deploying, Configuring & Troubleshooting

Read More
pattern

Try Your Own Cribl Sandbox

Experience a full version of Cribl Stream and Cribl Edge in the cloud with pre-made sources and destinations.

box

So you're rockin' Internet Explorer!

Classic choice. Sadly, our website is designed for all modern supported browsers like Edge, Chrome, Firefox, and Safari

Got one of those handy?