Consistent Hashing

References:

 

Mahout Spark Shell Locally

I was playing around with Mahout, and one of the things I wanted to try out was to use Mahout’s Spark Shell on my local machine

There is a nice example for doing this. But I hit a stack dump the moment I tried to start up the mahout shell using bin/mahout spark-shell

<br />java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream classdesc serialVersionUID = -2221986757032131007, local class serialVersionUID = -5447855329526097695
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)

The problem is because the spark version that Mahout was looking for was 1.6.2  (specified in the POM file).  The spark cluster I had started up was with the latest version 2.0.1

Here are the steps I did to get it going:

Installing Mahout & Spark on your local machine

  •  Create a directory for Mahout somewhere on your machine, change to there and checkout the master branch of Apache Mahout from GitHub :
  • Change to the mahout directory and build mahout using mvn -DskipTests clean install
  • Download Apache Spark (http://www.apache.org/dyn/closer.cgi/spark)
    • Note: Download the source code not just the pre-built binaries.
    • Select ‘Source Code’ in the Project type
  • Change to the directory where you unpacked Spark and type `sbt/sbt assembly` to build it
    • Takes close to an hour

 

Starting Mahout’s Spark shell

  • Goto the directory where you unpacked Spark and type `sbin/start-all.sh` to locally start Spark
  • Open a browser, point it to http://localhost:8080/ to check whether Spark successfully started. Copy the url of the spark master at the top of the page (it starts with spark://)
    • This starts spark in the Standalone mode with 1 master and 1 worker
    • Verified the spark version used was 1.6.2
  • Define the following environment variables in a file `mymahoutsparksettings.sh` and source that file so the following variables are set
<br />abgoswam@abgoswam-ubuntu:~/repos/mahout$ cat mymahoutsparksettings.sh
#!/usr/bin/env bash

export MAHOUT_HOME=/home/abgoswam/repos/mahout
export SPARK_HOME=/home/abgoswam/packages/spark-1.6.2
export MASTER=spark://abgoswam-ubuntu:7077

echo "Set variables for Mahout"
abgoswam@abgoswam-ubuntu:~/repos/mahout$

  • Finally, change to the directory where you unpacked Mahout and type `bin/mahout spark-shell`, you should see the shell starting and get the prompt mahout>.

References:

Windowing Operations in Azure Stream Analytics

Windowing is a very common operation in stream analytics.

Beneath the surface, there is a whole bunch of complex data structuring that’s going on to support the windowing operations. I would love to dig deeper into these someday.

Example:

Here is an example of a query I wrote recently using windowing operators in azure stream analytics. It shows 3 interesting things :
1. Windowing
2. CTEs
3. Aggregation over string columns (using TopOne)

WITH ContextReward AS (
    SELECT 
        eventid,
        TopOne() OVER (ORDER BY [EventEnqueuedUtcTime] ASC) CR,
        MAX (reward) AS reward
    FROM Input
    GROUP BY eventid, HoppingWindow(Duration(hour, 2), Hop(hour, 1))
)

SELECT 
    reward,
    eventid, 
    CR.actionname AS actionname,
    CR.age AS age,
    CR.gender AS gender,
    CR.weight AS weight,
    CR.actionprobability
INTO OutputWindow
FROM ContextReward

SELECT * INTO Output FROM Input 
SELECT * INTO OutputCSV FROM Input

 

References:

Making REST calls to send data to an Azure EventHub

I recently encountered a situation where I had to use pure REST Calls to send data to an Azure Event Hub.

Tips:

  • If you are used to using libraries (C#, Python) you will find that the libraries are doing a lot behind the scenes. Its not trivial to go from using the library to making pure REST calls
  • The first approach – using Fiddler to capture the traffic and re-purpose those calls – failed.
    • I am not sure why the calls fail to show up on fiddler. I tried out a few things like decrypt HTTPS and stuff. But I wasn’t able to get the sending traffic to show up on Fiddler
  • The references below give a good of how I made some progress.

REST Call to send data:

I finally got it to work with something like this:

POST https://simplexagpmeh.servicebus.windows.net/simplexagpmeh/messages?timeout=60&api-version=2014-01 HTTP/1.1
User-Agent: Fiddler
Authorization: SharedAccessSignature sr=http%3a%2f%2fsimplexagpmeh.servicebus.windows.net%2f&sig=RxvSkhotfGEwERdiaA8oLr7X9u5XLeDI8TCK5DhDPP8%3d&se=1476214239&skn=RootManageSharedAccessKey
ContentType: application/atom+xml;type=entry;charset=utf-8
Host: simplexagpmeh.servicebus.windows.net
Content-Length: 153
Expect: 100-continue

{ "DeviceId" : "ArduinoYun",
  "SensorData" : [ { "SensorId" : "awk",
        "SensorType" : "temperature",
        "SensorValue" : 24.5
      } ]
}

References:

Code:

 

Getting Started with Apache Kafka

Am doing some toe-dipping into Apache Kafka.

Linux:

Windows:

Commands from the Apache quick start documentation:

  • This gave me a good overview of what the system is doing.
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
[Now edit these new files and set the following properties:
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2]
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
> ps | grep server-1.properties
> kill -9 7564
[Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:]
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic