Complex Event Processing with AWS DynamoDB Streams and Lambda for Malware Detection

Dynamo-Lambda-CEP-Arch

With the growing popularity in capturing and analyzing time-series data (i.e. event-based), AWS’ recently introduced DynamoDB Streams coupled with their Lambda capability make great building blocks for Complex Event Processing (CEP) systems. You kind of get an all-in-one capability by using DynamoDB as your event store and then hooking up Lambda handlers to the DynamoDB Streams so that as you’re putting events into your event store you can also process them with your Lambda handlers.

Example Use Case

To demonstrate this, let’s look at the use case of using event processing for malware detection as illustrated in the figure above. In this use case we have a threat intelligence feed that’s giving us MD5 hashes on known malware files and we’re ingesting this into a Malware Catalog that we’re storing in DynamoDB. On the devices that we’re trying to protect, we have sensors/agents deployed that detect when new software or files are installed on them and then send an event to our Event Store in DynamoDB. These events report the ID of the device on which the software was installed, the name of the software, the MD5 hash of the software and some other details. We have the DynamoDB Stream enabled on that Event Store table so that as we’re putting these software installation events to the store, they’re also getting streamed to our Lambda handler. As these events are streamed to the Lambda handler, the handler extracts the MD5 hashes of the installed software from these events and then compare them to the MD5 hashes of known malware files in our Malware Catalog. If there’s a match indicating that malware was installed on the devices, the Lambda handler sends an alert to SNS which sends a notification email to an administrator. This is a simplified example and malware has gotten pretty sophisticated these days where such techniques aren’t always effective at detecting them, but it’s an easy to understand example to demonstrate how you can leverage DynamoDB, Streams, and Lambda to implement CEP systems.

Now let’s look at some code! All of this was implemented using the AWS SDK for Java. In this article I’m only going to show the important code snippets but all of the code is available on GitHub.

Setup

First we need to create the DynamoDB tables for our Event Store and Malware Catalog, populate the Malware Catalog, and create the SNS topic. I have a class called CEPDriver that does all of this setup.

public static void main(String[] args) {
	try {
		if ((args.length==0) || (args.length>1)) {
			System.out.println("Please supply one argument");
		}
		else if (args[0].equalsIgnoreCase("init")) {
			createTable(Event.TABLE_NAME, 10L, 10L, Event.KEY_ATTR, "S", null, null);
			createTable(Malware.TABLE_NAME, 10L, 10L, Malware.KEY_ATTR, "S", null, null);
			SensorSimulator.populateMalwareCatalog(dynamoDBClient);
			createSNSTopic();
		}
		else if (args[0].equalsIgnoreCase("generate")) {
			SensorSimulator.generateEvent(dynamoDBClient);
		}
		else if (args[0].equalsIgnoreCase("cleanup")) {
			deleteSNSTopic();
			deleteTable(Event.TABLE_NAME);
			deleteTable(Malware.TABLE_NAME);
		}
		else {
			System.out.println("Please supply a valid argument");
		}
	}
	catch (Exception ex) {
		System.err.print(ex);
	}
}

Now let’s look specifically at the code for creating the Event Store table, highlighted in lines 8-18 in Listing 2. This is based on the example provided in the AWS documentation for “Working with Tables Using the AWS SDK for Java Document API.” With DynamoDB, when you initially create the table, you don’t need to define the schema for the entire table, just the schema for the primary key. The documentation here describes how to define the primary key. Since we want our Lambda handler to be invoked when we put events into our Event Store, we need to enable a stream on this table. This is shown in lines 8-10 and 18 in Listing 2. The documentation here describes how to set up streams on DynamoDB tables. Since we’re only going to be putting new events (and not updating events) in the Event Store, we set the StreamViewType to NEW_IMAGE telling it that we want the stream record to contain the entire item, as it appears after it was modified. This will give us the entire contents of the software installation event as reported by the sensor so that our handler will have all the data it needs to perform malware detection.

private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, 
	String hashKeyName, String hashKeyType, String rangeKeyName, String rangeKeyType) {
	try {
		...
		CreateTableRequest request = null;
            	Table table;
		if (tableName.equals(Event.TABLE_NAME)) {
			StreamSpecification streamSpecification = new StreamSpecification();
			streamSpecification.setStreamEnabled(true);
			streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);

			request = new CreateTableRequest()
				.withTableName(tableName)
				.withKeySchema(keySchema)
				.withProvisionedThroughput( new ProvisionedThroughput()
					.withReadCapacityUnits(readCapacityUnits)
					.withWriteCapacityUnits(writeCapacityUnits))
					.withStreamSpecification(streamSpecification);
		}
		else if (tableName.equals(Malware.TABLE_NAME)) {
			...         	
		}
            
		request.setAttributeDefinitions(attributeDefinitions); 
		table = dynamoDB.createTable(request);           
		table.waitForActive();
	} 
	catch (Exception e) {
		System.err.println("CreateTable request failed for " + tableName);
		System.err.println(e.getMessage());
	}
}	

Next, let’s look at the code for creating the Malware Catalog table, shown in lines 10-31 in Listing 3. There are a couple differences here from the creation of the Event Store table. First, we’re not enabling a stream on this table as we’re not going to be processing threat intelligence events. In a real system you may want to do that as it makes sense to query the Event Store to see if you may have already installed a piece of malware when you get some new threat intel describing that malware. But for the sake of keeping this proof-of-concept simple, we skipped that so there is no stream on the Malware Catalog table. Second, since our business logic for malware detection is to compare MD5 hashes of installed software with MD5 hashes of known malware in the catalog, we’ll be querying the Malware Catalog table against the MD5 hash attribute so we want to create a secondary index on that attribute to speed things up. This is done with lines 14-23 and 31 in Listing 3. DynamoDB supports two types of secondary indexes: global secondary index and local secondary index. We’re using a global just because our key for the secondary index only consists of a hash key whereas the key for a local secondary index has to consist of a hash and range key. The documentation gives some guidelines on when to use a global vs a local secondary index. Earlier I said that when you create a DynamoDB table you don’t have to specify the schema upfront, this is true except when you’re creating a secondary index that references a specific attribute in which you will have to define that attribute when you create the table. Since we’re creating a secondary index on the MD5 hash attribute, we have to define that attribute upfront, as shown in lines 10-12 of Listing 3. For this proof-of-concept, we’re not going to be ingesting a real threat intelligence feed so I just have some code that puts some fake data into the Malware Catalog that we can correlate against.

private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, 
	String hashKeyName, String hashKeyType, String rangeKeyName, String rangeKeyType) {
	try {
		...
		
		if (tableName.equals(Event.TABLE_NAME)) {
			...
		}
		else if (tableName.equals(Malware.TABLE_NAME)) {
			attributeDefinitions.add(new AttributeDefinition()
				.withAttributeName(Malware.MD5_HASH_ATTR)
				.withAttributeType("S"));
				
			GlobalSecondaryIndex md5HashIndex = new GlobalSecondaryIndex()
				.withIndexName(Malware.INDEX_NAME)
				.withProvisionedThroughput(new ProvisionedThroughput()
					.withReadCapacityUnits(readCapacityUnits)
					.withWriteCapacityUnits(writeCapacityUnits))
				.withKeySchema( new KeySchemaElement()
					.withAttributeName(Malware.MD5_HASH_ATTR)
					.withKeyType(KeyType.HASH))
				.withProjection(new Projection()
					.withProjectionType("ALL"));
					
			request = new CreateTableRequest()
				.withTableName(tableName)
				.withKeySchema(keySchema)
				.withProvisionedThroughput( new ProvisionedThroughput()
					.withReadCapacityUnits(readCapacityUnits)
					.withWriteCapacityUnits(writeCapacityUnits))
				.withGlobalSecondaryIndexes(md5HashIndex);
		}
		
		...
	} 
	catch (Exception e) {
		System.err.println("CreateTable request failed for " + tableName);
		System.err.println(e.getMessage());
	}
}	

After our tables in DynamoDB have been created, we need to create a SNS topic that our Lambda handler will publish alerts to if it detects that malware has been installed on a device. The code for creating this SNS topic is shown in Listing 4. Once the topic has been created, we need to get the topic ARN and save it to the database so that our Lambda handler can get the ARN and publish to it later. To keep things simple, we’re just saving it the same Malware Catalog table. Notice here in lines 7-13 we’re using the AWS SDK for Java’s high-level object persistence model API.

private static void createSNSTopic() {
	CreateTopicRequest createTopicRequest = new CreateTopicRequest(Malware.MALWARE_NOTIFY_TOPIC);
	CreateTopicResult createTopicResult = snsClient.createTopic(createTopicRequest);

	//Save the topic ARN to the database so that our handler can get it later
	String topicARN = createTopicResult.getTopicArn();
	DynamoDBMapper mapper = new DynamoDBMapper(dynamoDBClient);	
	Malware malwareTopic = new Malware();
	malwareTopic.setMalwareID(Malware.MALWARE_NOTIFY_TOPIC);
	malwareTopic.setMalwareName(topicARN);
	malwareTopic.setDescription(topicARN);
	malwareTopic.setMD5Hash("");
	mapper.save(malwareTopic);
}

After the topic’s been created, we use the AWS console to create an email subscription, as shown in Figure 1, so that we can send email notifications when it’s been detected that malware was installed on a device.

Figure 1: Setting up email subscription to SNS topic

SNS Topic Subscr

Reading Data from DynamoDB Streams in the Lambda Handler

The trickiest part of this whole exercise was figuring out how to implement a Java-based Lambda handler that would be triggered by DynamoDB Streams and then how to extract the event stream record and its data in the handler implementation. All of the AWS documentation for this stuff uses Node.js for their examples. I ended up just implementing a handler function that takes Java input and output streams, based on the example here and then looking at the results of the input stream to figure out how to deserialize it to get the data I needed. DynamoDB Streams just sends JSON to the handler, looking something like what’s shown in Figure 2. What you’ll see in the Keys and NewImage elements depends on the schema of your data.

Figure 2: JSON input to Lambda handler from DynamoDB Stream

DynamoDB Stream Record

As shown in Listing 5 my handler method just takes an InputStream object, reads the contents into a ByteArrayOutputStream and then deserializes the JSON using Google’s gson.

public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {   
	LambdaLogger logger = context.getLogger();
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	int letter;        
	
	while((letter = inputStream.read()) != -1)
	{
		baos.write(letter);                     
	}        
	
	List<Record> records = deserialize(new String(baos.toByteArray())).records;

	for (Record record : records) {
		Event event = record.dbRecord.event;
		SoftwareMD5Hash md5Hash = event.softwareMD5Hash;	        	
		testForMalware(md5Hash.value, event.deviceID.value, logger);        	
	}      
}

public static DDBStreamRecords deserialize(String input) {
	Gson g = new Gson();
	DDBStreamRecords recs = g.fromJson(input, DDBStreamRecords.class);
	return recs;
}

The JSON just gets deserialized into a set of model objects I created, shown in Figure 3. The data that’s needed is all contained in the Event object but because of the JSON produced by DynamoDB Streams and the way the gson deserialization works, I had to create a bunch of wrapper objects.

Figure 3: Model objects deserialized from DynamoDB Stream JSON

model objects

I’m also using the Event object with DynamoDB’s Java object persistence model API, mentioned earlier, in a SensorSimulator object that pretends to be the sensors on the monitored devices and sends software installation events to the Event Store. Because the DynamoDB Java SDK uses reflection and different annotations than gson for serialization/deserialization, the code for this Event object is kind of ugly. Listing 6 shows a snippet of the code so you can see what I mean. The @DynamoDBTable annotation on line 1 specifies what DynamoDB table the object maps to. The @DynamoDBHashKey annotation on line 20 specifies that the eventID attribute should be used as the hashkey portion of the primary key for the DynamoDB table. DynamoDB Java SDK uses java bean conventions in conjunction with reflection and the @DynamoDBAttribute annotations, shown on line 34, to access the members and map to the JSON that’s stored in the database itself. On the other hand, gson uses the @SerializedName annotation to figure out the serialization/deserialization with JSON. Also the DynamoDB JSON contains an extra level of nesting, e.g. the "S" key before you can get to the actual value of an attribute so I had to create additional wrapper classes so that gson would deserialize properly, e.g. the EventID class shown on line 16.

@DynamoDBTable(tableName=Event.TABLE_NAME)
public class Event {	
	public static final String TABLE_NAME = "EventStore";
	public static final String KEY_ATTR = "EventID";
	public static final String EVENT_TYPE_ATTR = "EventType";
	public static final String SW_NAME_ATTR = "SoftwareName";
	public static final String SW_CPE_ATTR = "SoftwareCPE";	
	public static final String SW_MD5_HASH_ATTR = "SoftwareMD5Hash";
	public static final String DEVICE_ID_ATTR = "DeviceID";
	public static final String TIMESTAMP_ATTR = "Timestamp";
	public static final String SW_INSTALL = "SW INSTALL";
	public static final String SW_UNINSTALL = "SW UNINSTALL";

	@SerializedName(KEY_ATTR)
	public EventID eventID = new EventID();
	public static class EventID {
		@SerializedName("S")
		public String value;
	}
	@DynamoDBHashKey(attributeName=KEY_ATTR)  
	public String getEventID() { 		
		return eventID.value;		
	}
	public void setEventID(String id) {    	
		this.eventID.value = id;
	}

	@SerializedName(EVENT_TYPE_ATTR)
	public EventType eventType = new EventType();
	public static class EventType {
		@SerializedName("S")
		public String value;
	}
	@DynamoDBAttribute(attributeName=EVENT_TYPE_ATTR)  
	public String getEventType() {		
		return eventType.value;		
	}
	public void setEventType(String type) {    	
		this.eventType.value = type;
	}
	
	...

Testing for Malware

Now that our Lambda handler is able to read events and extract the necessary data from the DynamoDB Stream for the Event Store table, we can implement the logic to see if a piece of software that has just been installed matches any known malware in our Malware Catalog. So just to recap, the sequence of activities is as follows: sensors detect when new software is installed on a device, reports a software installation event to the Event Store table in DynamoDB, Lambda detects new records in the DynamoDB Stream and invokes our handler function. Our handler function reads in the input stream, deserializes the data into a set of objects, extracts the necessary data elements, e.g. MD5 hash of the installed software and ID of the device it was installed on, and invokes the testForMalware method shown in Listing 7. Just keep in mind, as mentioned earlier, this is a simplified implementation just to demonstrate the concept and the utility of DynamoDB Streams and Lambda. In lines 11-16, we take the MD5 hash of the installed software and we use that to query the secondary index of the Malware Catalog table which is keyed by the MD5 hash of each malware entry in the catalog. Then in lines 18-23 we test to see if there are any matches by iterating through the results of the query, i.e. if the query returns results, then there are matches. If there are matches, then it means the software just installed is a piece of malware, so we send an alert to the SNS topic and an email is sent to the appropriate parties through the email subscription to the SNS topic we set up earlier.

private void testForMalware(String md5Hash, String deviceID, LambdaLogger logger) {
	Table table = dynamoDB.getTable(Malware.TABLE_NAME);

	//Get the topic ARN that we put into the table when the SNS topic was set up
	Item topicItem = table.getItem(Malware.KEY_ATTR, Malware.MALWARE_NOTIFY_TOPIC);
	String topicARN = (String) topicItem.get(Malware.NAME_ATTR);

	Index index = table.getIndex(Malware.INDEX_NAME);
	ItemCollection<QueryOutcome> items = null;

	QuerySpec querySpec = new QuerySpec();        
	querySpec.withKeyConditionExpression(Malware.MD5_HASH_ATTR + " = :v_md5Hash")
		.withValueMap(new ValueMap()
		.withString(":v_md5Hash", md5Hash));

	items = index.query(querySpec);

	Iterator<Item> iterator = items.iterator();
	while (iterator.hasNext()) {
		Item item = iterator.next();
		String msg = "=========>>>>>>> " + deviceID + " infected with malware: " + item.get(Malware.NAME_ATTR);
		PublishRequest publishRequest = new PublishRequest(topicARN, msg);
		snsClient.publish(publishRequest);
	}	
}	

Registering the Lambda Handler

Now that the code for the handler is completed, we need to register it with AWS so that we can start using it. The documentation provides a pretty good step-by-step of how to do it. I just used the AWS console to do this for speed and convenience. Here are screenshots of the key steps.

Figure 4: Start by selecting the dynamodb-process-stream blueprint
create-lambda-1

Figure 5: Select DynamoDB as the event source type and EventStore as the DynamoDB table
create-lambda-2

Figure 6: Configure the function
create-lambda-3
When configuring the function make sure to select Java 8 as the runtime as it defaults to Node.js. To upload the function code, my Maven build will create a handler-1.0-SNAPSHOT.jar that can be uploaded. Next, type in the Handler name, the syntax is [fully qualified class name]::[name of handler method]. Then choose the Role that AWS Lambda will assume when executing the handler. The role will at a minimum need permissions to invoke the Lambda function and query from DynamoDB. The documentation here provides more information on the Lambda permission model.

Testing Everything

To test all of this, rather than setting up a network of devices, installing sensors, and getting them to publish to the Event Store, I just created a simple class to simulate all of that. This SensorSimulator class randomly publishes software installation events to the Event Store and every once in a while it will set a MD5 hash that matches some malware in the Malware Catalog. The code for this is shown in Listing 8.

public static void generateEvent(AmazonDynamoDBClient client) throws IOException, InterruptedException {
	DynamoDBMapper mapper = new DynamoDBMapper(client);				
	String now = dateFormatter.format(new Date(System.currentTimeMillis()));
	BufferedReader br = new BufferedReader(new InputStreamReader(System.in));		

	do {
		Event event = new Event();
		event.setEventID(generateRandomString(20, Mode.ALPHA_NUM));
		event.setEventType(Event.SW_INSTALL);
		event.setTimestamp(now);		
		String vendor = generateRandomString(10, Mode.ALPHA_NUM);
		String product = generateRandomString(12, Mode.ALPHA_NUM);
		event.setSoftwareCPE("cpe:/a:" + vendor + ":" + product);
		event.setSoftwareName(vendor.toUpperCase() + " " + product.toUpperCase());
		event.setDeviceID(generateRandomString(5, Mode.ALPHA_NUM).toLowerCase() + ".tieuluu.com");

		int random = (int) Math.floor(Math.random()*20);		
		if (random < badHashes.length) {
			event.setSoftwareMD5Hash(badHashes[random]);
			mapper.save(event);	
		}
		else {
			event.setSoftwareMD5Hash(generateRandomString(32, Mode.HEXA));
			mapper.save(event);	
		}	

		long sleepTime = (long)(Math.random()*7000);			
		Thread.sleep(sleepTime);
	} while ((!br.ready() || !((br.readLine()).equalsIgnoreCase("stop"))));		
}

If everything works, then every once in a while you should get an email, like the one shown in Figure 7, to the account that was used to subscribed to the SNS topic that the Lambda handler sends alert to.

Figure 7: Email alert indicating that some device was infected with some malware

email

Summary

With the introduction of DynamoDB Streams and the ability to trigger Lambda functions from activity on those streams, AWS has made it really easy to build complex event processing applications that ingest and analyze time-series data. Complex event processing systems are often written as a set of standing queries that the event data is “streamed through” and as we’ve just demonstrated with this example this is easily done by just implementing those queries as Lambda functions and then hanging them off DynamoDB Streams. Previously, you’d have to set up some kind of message bus or enterprise service bus that event sources would publish to and create various subscriber applications that would consume those events and process them. And if you wanted to save those events (you usually do), you’d have to create some other consumer that would listen for those events and then write them to a database. Now you have an all-in-one capability where your event sources can just put events into DynamoDB as the event store, use DynamoDB Streams as the bus for consuming those events, and then hang some Lambda functions off those streams to do the processing. This trio of components (DynamoDB + DynamoDB Streams + Lambda) is going to be a very powerful capability.