Learning Machine Learning

I’ve joined ViSenze for almost half a year, and our CTO said we (me and Terry) should learn some machine learning knowledge, since there are a lot of Phd here in our company:). He recommended a course on Coursera - Neural Networks for Machine Learning.

I am facing the Quiz of Week 3, while it needs some coding works in Octave.

Problems

The Macbook, which lays in my home, hasn’t been used for quite a time. When I use brew to install Octave, some weird issue happened:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Error: uninitialized constant Formulary::FormulaNamespacefe4ce29a01455f41d6d0b08c39f76615::Octave::DevelopmentTools
Please report this bug:
https://git.io/brew-troubleshooting
/usr/local/Library/Taps/homebrew/homebrew-science/octave.rb:31:in `<class:Octave>'
/usr/local/Library/Taps/homebrew/homebrew-science/octave.rb:1:in `load_formula'

/usr/local/Library/Homebrew/formulary.rb:21:in `module_eval'
/usr/local/Library/Homebrew/formulary.rb:21:in `load_formula'

/usr/local/Library/Homebrew/formulary.rb:38:in `load_formula_from_path'
/usr/local/Library/Homebrew/formulary.rb:87:in `load_file'

/usr/local/Library/Homebrew/formulary.rb:78:in `klass'
/usr/local/Library/Homebrew/formulary.rb:74:in `get_formula'

/usr/local/Library/Homebrew/formulary.rb:171:in `get_formula'
/usr/local/Library/Homebrew/formulary.rb:211:in `factory'

/usr/local/Library/Homebrew/extend/ARGV.rb:18:in `block in formulae'
/usr/local/Library/Homebrew/extend/ARGV.rb:16:in `map'

/usr/local/Library/Homebrew/extend/ARGV.rb:16:in `formulae'
/usr/local/Library/Homebrew/cmd/install.rb:95:in `install'

/usr/local/Library/brew.rb:87:in `<main>'

Solution

I found there is guide on Octave Wiki. Let me summarize it:

1
2
3
4
sudo chown -R $(whoami) /usr/local
brew update && brew upgrade # it will take some time
brew cask install xquartz
brew install octave # it will take a lot of time...

After brew install octave

When I typed ‘octave’ in my terminal, there is an issue like:

1
2
3
4
5
$octave
dyld: Library not loaded: /usr/local/opt/suite-sparse/lib/libsuitesparseconfig.4.5.4.dylib
Referenced from: /usr/local/bin/octave
Reason: image not found
Abort trap: 6

And actually I’ve already installed suite-sparse, version 4.5.5. You need to create a soft link:

1
2
cd /usr/local/opt/suite-sparse/lib/
ln -s libsuitesparseconfig.4.5.5.dylib libsuitesparseconfig.4.5.4.dylib

Create your application

It is easier if we can launch octave in our Application, not in terminal. We can use Script Editor in macOS:

1
tell application "Terminal"
	do script "`which octave`; exit"
end tell

Screenshot

Alt text

When I used mvn to package some java codes as usually, there happened a very weird exception:

1
INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.410 s
[INFO] Finished at: 2017-06-23T08:29:33+00:00
[INFO] Final Memory: 50M/612M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project analysis-spark: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile failed: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 7967A1610B14A2BD, AWS Error Code: AccessDenied, AWS Error Message: Access Denied -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :analysis-spark

The exception said that AWS access denied, as we use AWS in our maven settings like belowed:

1
2
3
4
5
6
7
8
<settings>
<servers>
<server>
<id>maven-s3-release-repo</id>
<username>USERNAME</username>
<password>PASSWORD</password>
</server>
</settings>

And I have double checked the authentication information is correct while other developers can package successfully in their own computers.

Finally I found that the problem is because I set a environment variable in Mac:

1
$env | grep AWS
AWS_SECRET_ACCESS_KEY=XXXXX
AWS_ACCESS_KEY_ID=XXXX

After unset them, the package procedure became okay. I think that is because the ENV has higher priority than the configuration in ~/.m2/settings.xml.

Yes! ViSenze

It has been a long time since last post, I’m busy in settling down in Singapore these days. Yes, I joined ViSenze, a startup who has raised round B fund. I’ll act as a Data Platform Lead, to build our data product and technical architecture. ViSenze has so many excellent talents and I feel really happy with these guys in the last month. Hope we all have a great advantages.

About ViSenze

ViSenze is an artificial intelligence company that develops advanced visual search and image recognition solutions to help businesses in eCommerce, mCommerce and online advertising.

Using R&D in machine learning and computer vision technology, ViSenze can recommend visually similar items to online shoppers, either on e-commerce platforms when they browse or search by uploading a picture or on content publishers platforms like social media and video networks.

The company is a spin-off from NExT, a leading research centre jointly established between National University of Singapore and Tsinghua University of China.

Welcome

Alt text

Backgroud

In my previous blog about compression benchmark for Kafka, I have made some tests for Kafka 0.8.2.1. Kafka 0.10 has made a lot of progress, and this post aims to make some benchmaks on Kafka 0.10.

In this post, I’m going to test 3 parts:

  1. Producer - time cost, throughput, bandwidth, total traffic
  2. Consumer - time cost, throughput
  3. Capacity - disk usage, server/client CPU usage

Environment

Hardware Box

I use Docker on Mac to run two containers - zk and kafka.

Mac CPUs Mac Memory Mac Disk Docker CPUs Docker Memory
2.5 GHz Intel Core i7 16GB 512GB SSD 4 2G

docker-compose.yml is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: "2"

services:
zk:
image: kafka-0.10_zk:1
ports:
- "2181:2181"

kafka:
depends_on:
- zk
image: kafka-0.10_kafka:1
ports:
- "9092:9092"
- "9999:9999"
command: --override zookeeper.connect=zk:2181

Software Box

All Kafka JVM parameters are default because the benchmark’s main purpose is to compare different compression algorithm to the none compression.

Kafka Version JDK Scala Broker Kafka Replica Kafka Partition
0.10.1.0 1.7.0-b147 2.11 1 1 1

Log Files

The log file comes from nginx access logs, and it use 1.2GB disk space and it has 5,190,426 lines.

1
2
3
4
5
6
[root@780e74bff80d kafka_2.11-0.10.1.0]# du -sh nginx.log
1.2G nginx.log
[root@780e74bff80d kafka_2.11-0.10.1.0]# wc -l nginx.log
5190426 nginx.log
[root@780e74bff80d kafka_2.11-0.10.1.0]# tail -1 nginx.log
127.0.0.1 hello.com 127.0.0.1 127.0.0.1 - 30/Dec/2016:09:43:28 +0800 POST /index.php HTTP/1.1 200 64 -Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727) 127.0.0.1 127.0.0.1:80 0.016 0.016

Producer Benchmark

To test Producer performance, I will use kafka-console-producer.sh to send nginx.log to kafka, dstat to get network metrics and time in linux to get the time costs. The details are below:

  1. time(second) - by Linux command time
  2. throughtput(message/second) - total lines/time
  3. bandwidth(MB/s) - by dstat -nT
  4. traffic(MB) - bandwidth * time

The command to send messsages is:

1
[root@a7dd0e808964 kafka_2.11-0.10.1.0]# time bin/kafka-console-producer.sh --broker-list a7dd0e808964:9092 --topic test_producer --batch-size 2000 --compression-codec none < nginx.log

The command to get network metrics is:

1
2
3
4
5
6
[root@a7dd0e808964 kafka_2.11-0.10.1.0]# dstat -nT
-net/total- --epoch---
recv send| epoch
0 0 |1483439091
86B 144B|1483439092
0 0 |1483439093

Detail Metrics for Every Compression Codecs

Compression Codec: none

batch.size time(sec) throughput(msg/s) bandwidth (MB/s) traffic (MB)
500 35.067 148014.54 38.3 1343.07
1000 35.579 145884.54 37.2 1323.54
1500 35.656 145569.5 38.3 1365.62
5000 31.905 162683.78 41.8 1333.63
10000 35.212 147405.03 38.3 1348.62
AVERAGE 34.68 149911.48 38.78 1342.9

Compression Codec: gzip

batch.size time(sec) throughput(msg/s) bandwidth (MB/s) traffic (MB)
500 73.302 70808.79 2.0 146.6
1000 68.695 75557.55 2.0 137.39
1500 72.471 71620.73 2.0 144.94
5000 76.469 67876.21 2.0 152.94
10000 73.865 70269.09 2.0 147.73
AVERAGE 72.96 71226.47 2.0 145.92

Compression Codec: snappy

batch.size time(sec) throughput(msg/s) bandwidth (MB/s) traffic (MB)
500 29.163 177979.84 24.5 714.49
1000 33.959 152843.9 21.4 726.72
1500 31.152 166616.14 23.5 732.07
5000 27.420 189293.440 28.0 767.76
10000 28.019 185246.65 26.9 753.71
AVERAGE 29.94 174395.99 24.86 738.95

Compression Codec: lz4

batch.size time(sec) throughput(msg/s) bandwidth (MB/s) traffic (MB)
500 17.937 289369.79 14.01 251.3
1000 17.837 290992.1 13.70 244.37
1500 17.143 302772.33 14.03 240.52
5000 18.525 280184.94 13.43 248.79
10000 20.567 252366.7 11.76 241.87
AVERAGE 18.4 283137.17 13.39 245.37

Summary

codec throughput bandwidth traffic
none 149911.48 38.78 1342.9
gzip 71226.47 2.0 145.92
snappy 174395.99 24.86 738.95
lz4 283137.17 13.39 245.37

Throughput Overview

Alt text

Bandwidth Overview

Alt text

Traffic Overview

Alt text

Percentage Overview

codec throughput% bandwidth% traffic%
gzip 47.51 5.16 10.87
snappy 116.33 64.11 55.03
lz4 188.87 34.53 18.27

Alt text

Consumer Benchmark

Tests for Consumer is much more easier than the one for Producer. Before tests, I will send the same nginx.log to Kafka with different compression codec - none, gzip, snappy and lz4. And use kafka-console-consumer.sh to consume a fixed number of messages and in this tests the number is 500k(5,000,000). What I need to look for is the time the procedure costs, and furthermore, we can get the throughput.

The bash command is:

1
time bin/kafka-console-consumer.sh --bootstrap-server 97200db31e2c:9092 --topic consumer_none --max-messages 5000000 --from-beginning > /dev/null

Detail Metrics

codec time(second) throughput(msg/s)
none 19.046 262522.31
gzip 39.493 126604.71
snappy 25.632 195068.66
lz4 23.846 209678.77

Alt text

Percentage:

codec time% throughput%
gzip 207.36 48.23
snappy 134.58 74.31
lz4 125.2 79.87

Alt text

Capacity Benchmark

In the previous blog, I have not make tests for this section. The pressure each codec cause to the CPU is another important factor to consider. I will make some simple benchmark in disk space and CPU in this section by dstat.

Disk Usage

Although Kafka has its own retention policies, and it works well, but sometimes the disk space could be in engineers’ consideration, especially in large Kafka cluster. In the former section, I have sent nginx.log to Kafka with different codecs and I measured the the disk space each topic has used. The numbers can simply be got by du -sh in Kafka logs directory.

codec Disk Space(MB) Percentage%
none 1329.53 100
gzip 140.18 10.54
snappy 679.81 51.13
lz4 222.58 16.74

Alt text

CPU Usage

Compression and de-compression will mainly use cpu and I will record the usr, sys, wait and the total of them to measure how much CPU each codec will use. The data is made by dstat as well. Pay attention that my docker only has 4 CPU and this tests are mainly used to compare with different codec, not to dig into the absoulute number because it would be different in different boxes.

The test is simple, I used dstat to record the system metrics I want, and meanwhile, use kafka-console-producer.sh or kafka-console-consumers.sh in another container (not the Kafka container) to send or consume data from Kafka.

I will record metrics of both client(run console shell) and server(run Kafka server).

Producer CPU Usage

Server Side

Metrics

codec usr sys wait total
none 41.08 6.56 7.08 54.72
gzip 31.28 1.70 0.45 33.43
snappy 36.89 4.12 4.13 45.14
lz4 33.72 2.77 1.37 37.86

Chart

Alt text

Client side

Metrics

codec usr sys wait total
none 41.00 6.52 7.09 54.61
gzip 31.28 1.70 0.44 33.42
snappy 36.65 4.13 4.07 44.85
lz4 33.70 2.76 1.36 37.82

Chart

Alt text

Consumer CPU Usage

Server side

Metrics

codec usr sys wait total
none 19.41 5.05 7.74 32.2
gzip 25.63 1.43 0.47 27.53
snappy 23.92 4.23 3.90 32.05
lz4 24.13 2.35 0.90 27.38

Chart

Alt text

Client Side

Metrics

codec usr sys wait total
none 19.47 5.05 7.77 32.29
gzip 18.84 1.16 0.41 20.41
snappy 23.82 4.36 3.86 32.04
lz4 24.13 2.35 0.91 27.39

Chart

Alt text

Conclusion

GZIP has the best compression rate but lowest performance, and LZ4 has the best performance. In the aspect of capacity, GZIP and NONE will cause wome wait which I don’t know the reason for that. Actually, the CPU usage for each codec is almost the same, I think capacity won’t be the main cause to choose different codecs.

To summarize the benchmarks briefly, use GZIP if you need cost less bandwidth and disk space, use LZ4 to maximum the performance

There is also one problem this benchmark has not cover - how much CPU usage would Kafka server use when there are a huge number of clients. Will the increase of the server-side CPU usage be the linear growth with the number of client? I have not made this test because I only have two containers.

Environment

This test’s main purpose is to choose the tool used to collect log on servers. In this test, we use Apache Flume and Heka to collect raw log data and send them into Kafka. The Kafka is in all default configuration. Linux is CentOS 6.4 with 12 cores CPU, 45GB memory and JDK 1.7.

Benchmark Metrics

  1. mps - Message Per Second: This data is fetched from JMX port of Kafka. We get the total message count from Kafka, and then get this count again. The delta value is the message count in 1 second - mps. We can get mps by command line:
1
java -jar /tmp/cmdline-jmxclient-0.10.3.jar - localhost:9999 kafk
a.server:name=MessagesInPerSec,type=BrokerTopicMetrics Count
  1. channelSize (only for Flume): This metrics is the number of messages Flume cannot handle in a while. It can be a measurement to determin if the Flume is health. A health Flume instance should keep this value in a low number. We can get this value by Flume http service:
1
curl -s localhost:34545/metrics | python -m json.tool | grep Channe
lSize
  1. %cpu and %mem: We get these two values directly by linux top command:
1
flume_pid=`jps -m | grep App | cut -d' ' -f1`
flume_cpu=`top -b -n1 -p $flume_pid | grep $flume_pid | awk '{print
$9}'`
flume_mem=`top -b -n1 -p $flume_pid | grep $flume_pid | awk '{print
$10}'`

Benchmark Items

We used a tool to write log lines into log files with different mps - 5K, 10K, 50K.

Flume Benchmarks

Flume Configuration

JVM

1
JAVA_OPTS="-Xms256m -Xmx256m -Dflume.monitoring.type=http -Dflume.monitoring.port=34545"

Flume Configuration

agent.sources = fileSource
agent.channels = diskChannel memChannel
agent.sinks = kafkaSink

agent.sources.fileSource.type = exec
agent.sources.fileSource.command = tail -F /tmp/example.log
agent.sources.fileSource.channels = memChannel

agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 1000000
agent.channels.memChannel.transactionCapacity = 10000

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = flume_test
agent.sinks.kafkaSink.brokerList = kafka:9092
agent.sinks.kafkaSink.batchSize = 200
agent.sinks.kafkaSink.channel = memChannel

Flume Benchmark Results

5K mps

Alt text

Alt text

10K mps

Alt text

Alt text

50K mps

Alt text

Alt text

Heka Benchmarks

Heka Configuration

[accesslogs]
type = "LogstreamerInput"
log_directory = "/tmp"
file_match = 'example\.log'

[PayloadEncoder]
append_newlines = false

[KafkaOutput]
type = "KafkaOutput"
message_matcher = "TRUE"
topic = "heka_test"
addrs = ["kafka:9092"]
encoder = "PayloadEncoder"
max_buffer_time = 1000
max_buffered_bytes = 1048576
required_acks = "NoResponse"

Heka Benchmark Results

5K mps

Alt text

Alt text

10K mps

Alt text

Alt text

50K mps

Alt text

Alt text

The Comparison

About Performance

As the charts said, both Flume and Heka can handle huge amout of log in very high speed. We usually deploy them on servers which act as business usage. That means, it would not take much pressure (if we use them behind Apache Kafka - another usage type is sending date TO Kafka, they could work well). Considering the Linux system costs, Heka takes advantages on %MEM while Flume does on %CPU. Heka is written in Golang so that the memory usage is much lower than Flume which is Java made. For %CPU, Flume uses less cpu than Heka. Chars are below:

Alt text

Alt text

About Usability

Features

Flume’s configuration files are written in YAML while Heka’s in TOML. We can find out that writing in TOML is much more simple than YAML as well as we can understand more easily. As for the architectures, Flume is easier. It contains three main components - Source, Channel and Sink. Source defines the date source; Sink defines the destination of the date; Channel is the buffer date pipeline between Source and Sink. In our exmple, log files is the Source and Kafka is the Sink. Source puts date into Channel while Sink takes date from Channel. The architecture is like below:

Alt text

For Heka, its architecture is a little complex. Its Inputs, Outputs are like Flume’s Source and Sink. Heka adds many feature such as Splitter, Decoder, Filter and Encoder in addition. These feature enable Heka to handle date directly in Heka itself, in the same time, make Heka a more powerful tool in comparison of Flume. The architecture is like below:

Alt text

Monitor

Both Flume and Heka enables HTTP API to get metrics.

For Flume, we can add JVM parameters -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 to enable the HTTP API. And the date is like below (by curl -s localhost:34545/metrics | python -m json.tool):

{
       "CHANNEL.memChannel": {
           "ChannelCapacity": "1000000",
           "ChannelFillPercentage": "0.0",
           "EventPutAttemptCount": "10",
           "EventPutSuccessCount": "10",
           "EventTakeAttemptCount": "16",
           "EventTakeSuccessCount": "10",
           "StartTime": "1458887648261",
           "StopTime": "0",
           "Type": "CHANNEL"
           },
           ...
   }

For Heka, we need to add this in to its configuration file, ticker_interval defines the interval Heka refresh the data:

[DashboardOutput]
ticker_interval = 1

The result is like (by ``):

"outputs": [{
    "Name": "KafkaOutput",
    "ProcessMessageCount": {
        "representation": "count",
        "value": 756698
    },
    "ProcessMessageDiscards": {
        "representation": "count",
        "value": 0
    },
    "ProcessMessageFailures": {
        "representation": "count",
        "value": 0 }
    },
    ...
}

Problems

  1. Flume cannot read log file continously after restart. Flume uses tail to get log lines from log files so Flume does not know which line it has read or which line it need to start to read after Flume restarts. Heka will keep offset in a meta file and can read the date continously after restarting. In this point, Heka is better.

  2. Flume cannot update log files according to the configurations. For example, we defined we need to get log files named ‘access_*.log’, Flume will not get the files working only when we modify the configuration files. For Heka, Heka will monitor the directory and update the log files in some interval time which is set in configuration files. The parameter is rescan_interval:

During logfile rotation, or if the logfile is not originally present on the system, this interval is how often the existence of the logfile will be checked for. The default of 5 seconds is usually fine. This interval is in milliseconds.

Conclusions

Both Flume and Heka are good choice for our solutions. Heka is written Golang and we can start/stop it in seconds. For Flume, it always takes minutes to initialize or stop. And Heka can handle the circumstance of continoulsy reading while Flume can not. I think Heka would be a better choice.

But the bad news is that Heka is deprecated by mozilla in May of 2016. We can get the detail inforation here. I thinks there are too many features in Heka and that makes Heka a heavy tool. I like it very much and my plan is to remove the additional components of Heka and make it a simple but powerful tool.

Problem

Recently I met a problem about how to compare to JSON. As we know, JSON is dict-like data structure, the order of its content is not import in the comparison. For example, {'name': 'frank', 'age': '12'} equals {'age': '12', 'name': 'frank'}. In the real world, perhaps there are some more complex data structures like list, tuple or nested-dict.

Solutions

In python(both python2 and python3), we can use pprint to get pretty print for JSON and other data structures. It will sort and format data first. And there is another method in pprint module called pformat which is used to get the output to a string. Let’s see the example.

1
2
3
4
5
6
import pprint

d1 = {'name': 'frank', 'age': '12'}
pformat_d1 = pprint.pformat(d1) # pformat_d1: {'age': '12', 'name': 'frank'}
d2 = {'age': '12', 'name': 'frank'}
pformat_d2 = pprint.pformat(d2) # pformat_d2: {'age': '12', 'name': 'frank'}

Backgroud

We were using spark in Scala environment for a long time and it did its job well. Now we need to query spark in python3 scripts and this post is a simple how-to guide for pyspark.

Prerequisites

  1. jdk
  2. spark
  3. python3
  4. set JAVA_HOME and SPARK_HOME in env

pyspark is in your SPARK_HOME/python, you need to put it into you python path. You can cp or ln the directory to you python3 path, another method is use sys.path.append in your scripts.

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext


if __name__ == '__main__':
conf = SparkConf()
conf.set('spark.executor.memory', '16g')
sc = SparkContext('spark://10.3.99.42:7077', appName='test', conf=conf)
sc.setLogLevel('ERROR')
hiveContext = HiveContext(sc)
hiveContext.setConf('hive.metastore.uris', 'thrift://127.0.0.1:9083')
hiveContext.setConf('io.compression.codecs', 'org.apache.hadoop.io.compress.SnappyCodec')
hiveContext.sql('use default')
hiveContext.sql("select * from table").show()

Intro

LZO is a compression codec for Hadoop-environment component like hadoop, hbase, spark and so on. The problem I met is that I want to read and calculate data from Hive table in lzo by SparkSQL. In this post, I will show you how to enable lzo in a Spark box without extra hadoop lib and run SparkSQL in Intellij IDEA in Mac.

Box

  • spark-1.6.2-bin-hadoop2.6
  • jdk-1.7
  • hadoop-lzo-0.4.20
  • maven3

Spark node run on CentOS 7, and Intellij IDEA is on OS X EI Capitan.

Enable lzo in spark node

You can run spark in standalone without any other component, so it is easy to test wheather we have enable lzo.

Install linux lzo dependencies by yum

1
yum install -y lzo-devel lzop

clone hadoop-lzo repo from github

Clone it at https://github.com/twitter/hadoop-lzo. Then use maven.

1
2
3
git clone https://github.com/twitter/hadoop-lzo
cd hadoop-lzo
mvn package

After package it, you will get a jar named hadoop-lzo-0.4.20-SNAPSHOT.jar in target directory.

Add jar to spark env by set env in ${SPARK_HOME}/conf/spark-conf.sh.

1
export SPARK_CLASSPATH=${HADOOP-LZO_HOME}/target/hadoop-lzo-0.4.20-SNAPSHOT.jar

${SPARK_HOME} and ${HADOOP-LZO_HOME} is the directory you put spark and hadoop-lzo repo.

Run spark in standalone mode.

1
2
3
cd ${SPARK_HOME}
./sbin/start-master.sh
./sbin/start-slave.sh spark://${SPARK_IP}:${SPARK_PORT}

Test if lzo is availabe in spark by spark-shell.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
cd ${SPARK_HOME}
./bin/spark-shell
scala> sqlContext.setConf("hive.metastore.uris", "thrift://cloudera-server:9083")

scala> sqlContext.setConf("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec")

scala> sqlContext.sql("use default")
res2: org.apache.spark.sql.DataFrame = [result: string]

scala> sqlContext.sql("select cdn_ip, timestamp from total_cdn_log where date='2016-07-05' and hour='01' limit 10").show()
+-----------+----------+
| cdn_ip| timestamp|
+-----------+----------+
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
|cdn_ip_none|1467651600|
+-----------+----------+

We must set io.compression.codecs to com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec because spark use snappy codec in default.

Enable lzo in IntelliJ IDEA

I think you have make your SparkSQL job running - that is to say, scala box is okay in your IntelliJ IDEA. Mac OS is a little different with RedHat because it is based on FreeBSD and NetBSD’s implementions.

Install lzo dependencies by brew

brew is an excellent package management tool in Mac which its full name is Homebrew. If you have not installed brew, you can go to its website http://brew.sh/. Its install is easy with only one line command.

1
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

Okay, let’s install lzo dependencies.

1
brew install lzo lzop

Package hadoop-lzo

REMARK: Please DO NOT copy the hadoop-lzo from other system - such as CentOS that we run spark on. Because the maven procedure in hadoop-lzo contains some gcc process which is strongly connected with the system.

To build hadoop-lzo, we need to specify the location we install lzo.

1
2
3
4
5
$ brew list lzo
/usr/local/Cellar/lzo/2.09/include/lzo/ (13 files)
/usr/local/Cellar/lzo/2.09/lib/liblzo2.2.dylib
/usr/local/Cellar/lzo/2.09/lib/ (2 other files)
/usr/local/Cellar/lzo/2.09/share/doc/ (7 files)

Then we can get hadoop-lzo and package jar.

1
2
3
$ git clone https://github.com/twitter/hadoop-lzo
$ cd hadoop-lzo
$ C_INCLUDE_PATH=/usr/local/Cellar/lzo/2.09/include/ LIBRARY_PATH=/usr/local/Cellar/lzo/2.09/lib/ mvn clean package

REMARK: The guide https://gist.github.com/zedar/c43cbc7ff7f98abee885 - Add LZO compresssion codecs to the Apache Hadoop and Spark is little wrong for set C_INCLUDE_PATH. In the guide, he said
C_INCLUDE_PATH=/usr/local/Cellar/lzo/2.06/include/lzo/, it should be C_INCLUDE_PATH=/usr/local/Cellar/lzo/2.09/include/. We do not need add lzo suffix.

Config the IntelliJ IDEA box

The first step is add hadoop-lzo-0.4.20-SNAPSHOT.jar to your scala project.

Alt text

The next step is add java.library.path to your jvm options in your menu Run-Edit configurations.... The option you need to add is like -Djava.library.path=/Users/yaorenjie/Documents/git/hadoop-lzo/target

Alt text

REMARK: You must specify the build directory you run maven. If you want to use other directory, you MUST move all files in target. The hadoop-lzo need native gpl library in target directory.

1
2
3
4
5
6
$ll native/Mac_OS_X-x86_64-64/lib
total 208
-rwxr-xr-x 1 yaorenjie staff 21780 Jul 7 17:36 libgplcompression.0.dylib
-rw-r--r-- 1 yaorenjie staff 1164 Jul 7 17:36 libgplcompression.la
lrwxr-xr-x 1 yaorenjie staff 25 Jul 7 17:36 libgplcompression.dylib -> libgplcompression.0.dylib
-rw-r--r-- 1 yaorenjie staff 69744 Jul 7 17:36 libgplcompression.a

If you want to dig it out, you can get this process in GPLNativeCodeLoader

Sometimes you can only connect to MySQL via a certain server for security reasons. But apparently, coding in IDE(e.g PyCharm) is easier. For this circumstance, we can use sshtunnel.

So we have a MySQL server on 127.0.0.3:3306, its mysql login name and password is MYSQL_USER and MYSQL_PASSWORD. And we can only connect to this mysql via server 127.0.0.2:22, and its login name and password is root and SERVER_PASSWORD. Let’s figure out how sshtunnel is working.

1
2
3
with SSHTunnelForwarder(('127.0.0.2', 22), ssh_password='SERVER_PASSWORD', ssh_username='root', remote_bind_address=('127.0.0.3', 3306)) as server:
conn = MySQLdb.connect(host='127.0.0.1', port=server.local_bind_port, user='MYSQL_USER', passwd='MYSQL_PASSWORD')
cursor = conn.cursor()

For some reasons, pypi.python.org is not available or has a slow download rate. We can simply using douban pypi or v2ex pypi by two ways.

edit env

Edit env will enable pip using own pypi in default. In Linux and Mac, the file is in %HOME/.pip/pip.conf. And the content is:

1
2
3
4
5
6
[global]
find-links=http://pypi.douban.com/simple
[install]
find-links=
http://pypi.douban.com/simple
http://pypi.v2ex.com/simple