As a consulting engineer at AL, my bread and butter for the last three years has been the likes of Terraform, Ansible and Jenkins. These are all tools which are fairly simple in principle, but have plenty of quirks and unintended uses which can keep an engineer learning/engaged/frustrated for some time. For a couple of different client projects I had been using these to build and automate a variety of things in Microsoft Azure – mostly based around virtual machines in a fairly ‘datacentre-like’ setup. So when the opportunity arose to work on some serverless data pipeline projects using Google Cloud at a major bank, I eagerly took the plunge into the unknown.

The first project, a data platform aggregating information on known vulnerabilities from a variety of public and private sources, I ended up doing mostly…Terraform. Annoyingly, sometimes people just want you to do what you’re good at. But I also did a lot of work making CI/CD pipelines using Google’s Cloud Build, as well as touching on some Google Kubernetes Engine cluster configuration and Helm deployments.

The real fun came when I was seconded to a separate security-related project, focussed on user behaviour analytics. An existing third party-hosted tool was deemed too expensive and opaque, so they wanted to bring the ability to spot potentially nefarious insider activity in-house – or rather, in-cloud to Google Cloud.

I was tasked with designing and creating a proof of concept that could demonstrate some behaviour baselining and anomaly detection when fed a set of log data (completely fabricated of course, for security reasons). After a few chats with one of their SMEs, we decided that Tanium logs would be a good place to start. These capture login events to domain-connected devices across the organisation, containing some useful elements such as usernames, login timestamps, hostnames and IP addresses. The idea was to broadly replicate what the existing system did: consume and parse the data, and calculate some ‘baseline’ behaviour for each identified user using a data clustering technique. Arguably, depending on who you ask, this qualifies as unsupervised machine learning (ooh), but really it’s just fairly simple statistics that anyone can understand. It doesn’t really matter either way, because as with everything there’s a Python module that does it for you.

The first task was to create a data pipeline which would group the logs by user, getting a list of login times for each, and applying a DBSCAN (density-based spatial clustering of applications with noise, to give its full chin-stroking name) algorithm on each user’s list of timestamps (converted into a more simple measure of ‘minute of the week’). This basically looks for dense clusters of data points, excludes any outliers, and returns the upper and lower bounds of these clusters. Taking a suitable time gap (call it a grace period) either side of these, the pipeline returns a list of time windows over the working week in which each user typically logs in. 

Looking at some aggregated real data, it appeared that the distribution of user logins typically had two peaks – one in the morning, and a smaller one in the afternoon. A synthetic dataset was created to reflect this, generating login times for users based around two overlapping normal distributions with some randomly generated deviation from the means (9am and 1.30pm). The DBSCAN algorithm parameters can be tweaked to attempt to return one or two windows per working day for most users, in order to capture this pattern. A single user’s baseline may therefore, in plain language, be something like ‘8.30-9.45 and 12.34-13.45 on Monday, 8.20-9.30 on Tuesday…’ and so on. ‘New’ log data (in our case, just the latest 10% or so of generated logs) could then be checked against the user’s login windows (their baseline) to determine whether the login was at an unusual time and could, in combination with other risk factors, indicate suspicious insider activity. Later on I added some intentionally anomalous logins to test this.

I set about writing some Python code which would do the necessary transforms and calculations, with the aim of deploying this code using Google’s serverless Cloud Functions, as it seemed the simplest way to get going. A function could be triggered by an upload of a new log file to a storage bucket – automation for the win. I’d already experimented with the data and clustering in a Jupyter notebook, using NumPy and Pandas, so putting this code into a single function was relatively straightforward. But then, indecision being rife in such organisations, it was decided that Functions was insecure and wouldn’t be allowed – I would need to use Dataflow, which fortunately makes more sense as a ‘production-like’ solution anyway. For those not in the know, Dataflow is a data pipeline service which acts as a ‘runner’ for Apache Beam. According to its website: “Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines”. 

Pipelines can be written in Java or Python (no prizes for guessing which I chose), but as the data in Beam is stored in objects called ‘PCollections’ rather than Pandas dataframes, I had to adapt my functions to work with Dataflow. Overall it’s a much cleaner, simpler way of extracting, transforming and loading (ETL) data – Beam treats a dataset as a collection of elements, and transformations can either apply element-wise (in parallel, akin to the ‘map’ in MapReduce) or by grouping/reducing the elements. PCollections can also be branched into multiple pipelines, or combined. Pipelines can be run locally on a laptop using the ‘direct runner’ method during development, then pushed to the Dataflow runner when ready to deploy (though I did find a fair few changes were required to get it to run in Dataflow, mostly around how input parameters are created). Dataflow is designed to use either batch or streaming inputs, but in this case I only tried the former; the input was again simply a file uploaded to a bucket.

This is what the main pipeline looks like for calculating baseline login windows. Starting with log entries parsed into dictionaries, we extract the username and login time (as an integer representing the number of minutes into the week) as a tuple, then group by username, then run the clustering function get_cluster_bounaries (defined further up in the file) and write the results to BigQuery (Google’s data warehouse).

As you can see, this is a really nice and clear way of constructing a data processing pipeline. Beam has built-in transforms for pretty much anything you could want to do, but you can also write custom transforms if these are insufficient for your needs. In the ‘Clustering’ step above, I use a combination of the ‘Map’ transform, with an anonymous (lambda) function calling my get_cluster_boundaries function in order to pass in the additional parameters for the DBSCAN algorithm (‘eps’ and ‘min_samples’). The second pipeline (not shown), for detecting anomalous login times in new logs, reads in the baselines from BigQuery as a ‘side input’, so that each element can be checked against it. All potential anomalies are inserted into a separate BigQuery table for further analysis.

When the pipeline is triggered, you get a nice graph in the GCP console showing the flow of data through the transforms, and can identify any failures in the pipeline (in the image here, you can see the pipeline splits into three branches – the centre one is shown in the code snippet above). While it’s possible to simply run the pipeline command locally and specify the DataflowRunner, the more production-ready method is to upload the pipeline to Google Cloud as a ‘template’ stored in a bucket. This lets us specify parameters to define at runtime, and means the pipeline can be triggered from anywhere using the Google Dataflow SDK, rather than running a Python command. A good option in GCP is Cloud Composer, which provides powerful scheduling and orchestration of tasks across GCP services, but a simpler tool called Cloud Scheduler allows for cron-like scheduling of tasks, including running Dataflow pipelines. 

As an extension, I added pipeline sections to add hostnames to a user’s baseline, so that logins to any unusual hosts could be added as a risk factor. I also added ‘leaving dates’ to some users (as an employee leaving the business soon could be a higher risk), added made-up job titles for all the users, and assigned each job role an inherent base risk score – for example, a database admin or a senior manager has a much higher base risk than a graduate. This base score (ranging from 1-10) was then multiplied if the user was found to have an anomalous login (either in time or to an unusual host), and increased again if the employee is soon to leave, to calculate a ‘risk score’ for that login event using BigQuery’s SQL-like query capabilities. The output was a table of potentially risky logins, sorted with the highest risk scores first. This kind of output could then form the basis for a triage process followed by an investigation if required.

I learned a lot during this PoC – brushed up on my Python, buffed up on NumPy and Pandas, gained knowledge about clustering algorithms, got intimate with Dataflow and struggled with some fiddly SQL. Having not had much to do with data-focussed projects before, it was a fun new challenge, and it even inspired me to finally take my Google Cloud Data Engineering Professional certification exam (which fortunately I passed). While the PoC only employed a small subset of the data tools available on GCP, Dataflow and BigQuery are likely to be key components in any scalable big data project on Google Cloud.

Are you an engineer looking for more relatable content? Click here. 

< Back