Make your web site faster

Google’s mod_pagespeed speeds up your site and reduces page load time. This open-source Apache HTTP server module automatically applies web performance best practices to pages, and associated assets (CSS, JavaScript, images) without requiring that you modify your existing content or workflow.

Features
  • Automatic website and asset optimization
  • Latest web optimization techniques
  • 40+ configurable optimization filters
  • Free, open-source, and frequently updated
  • Deployed by individual sites, hosting providers, CDN’s

How does mod_pagespeed speed up web-sites?

mod_pagespeed improves web page latency and bandwidth usage by changing the resources on that web page to implement web performance best practices. Each optimization is implemented as a custom filter in mod_pagespeed, which are executed when the Apache HTTP server serves the website assets. Some filters simply alter the HTML content, and other filters change references to CSS, JavaScript, or images to point to more optimized versions.

mod_pagespeed implements custom optimization strategies for each type of asset referenced by the website, to make them smaller, reduce the loading time, and extend the cache lifetime of each asset. These optimizations include combining and minifying JavaScript and CSS files, inlining small resources, and others. mod_pagespeed also dynamically optimizes images by removing unused meta-data from each file, resizing the images to specified dimensions, and re-encoding images to be served in the most efficient format available to the user.

mod_pagespeed ships with a set of core filters designed to safely optimize the content of your site without affecting the look or behavior of your site. In addition, it provides a number of more advanced filters which can be turned on by the site owner to gain higher performance improvements.

mod_pagespeed can be deployed and customized for individual web sites, as well as being used by large hosting providers and CDN’s to help their users improve performance of their sites, lower the latency of their pages, and decrease bandwidth usage.

Installing mod_pagespeed

Supported platforms

  • CentOS/Fedora (32-bit and 64-bit)
  • Debian/Ubuntu (32-bit and 64-bit)

To install the packages, on Debian/Ubuntu, please run (as root) the following command:

dpkg -i mod-pagespeed-*.deb
apt-get -f install

For CentOS/Fedora, please execute (also as root):

yum install at  # if you do not already have 'at' installed
rpm -U mod-pagespeed-*.rpm

Installing mod_pagespeed will add the Google repository so your system will automatically keep mod_pagespeed up to date. If you don’t want Google’s repository, do sudo touch /etc/default/mod-pagespeed before installing the package.

You can also download a number of system tests. These are the same tests available onModPageSpeed.com.

What is installed

  • The mod_pagespeed packages install two versions of the mod_pagespeed code itself, mod_pagespeed.so for Apache 2.2 andmod_pagespeed_ap24.so for Apache 2.4.
  • Configuration files: pagespeed.confpagespeed_libraries.conf, and (on Debian) pagespeed.load. If you modify one of these configuration files, that file will not be upgraded automatically in the future.
  • A standalone JavaScript minifier pagespeed_js_minify based on the one used in mod_pagespeed, that can both minify JavaScript and generate metadata for library canonicalization.

Facebook Events Join the Contextual-Computing Party

Facebook made a tweak to its Events system this week, adding a little embedded forecast that shows projected weather on the day of the event. It’s a small change, but part of a big shift in computing.

zuck

Facebook CEO Mark Zuckerberg at a product launch earlier this month. Photo: Alex Washburn/Wired

The new feature, described by Facebook in briefings with individual reporters, pulls forecasts for the location of the event from monitoring company Weather Underground and attaches it to the Facebook pages of events happening within the next 10 days. The data is also shown while the event is being created, helping organizers avoid rained-out picnics and the like.

The change makes Facebook more sensitive to contextual information, data like location and time of day that the user doesn’t even have to enter. Facebook rival Google has drawn big praise for its own context-sensitive application Google Now, which, depending on your habits, might show you weather and the day’s appointments when you wake up, traffic information when you get in your car, and your boarding pass when you arrive at the airport. Google Now was so successful on Android smartphones that Google is reportedly porting the app to Apple’s iOS.

Apple’s own stab at contextual computing, the Siri digital assistant, has been less successful, but that seems to have more to do with implementation issues – overloaded servers, bad maps, and tricky voice-recognition problems – than with the idea of selecting information based on location and other situational data.

Hungry as Facebook is to sell ever-more-targeted ads at ever-higher premiums, expect the social network to add more context-sensitive features. One natural step is putting the Graph Search search engine on mobile phones and tailoring results more closely to location. Another is to upgrade Facebook’s rapidly evolving News Feed, which already filters some information based on your past check-ins, along the same lines. Done right, pushing information to Facebook users based on context could multiply the social network’s utility. Done wrong, it could be creepy on a whole new level.

Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node

In this article I describe how to install, configure and run a multi-broker Apache Kafka 0.8 (trunk) cluster on a single machine. The final setup consists of one local ZooKeeper instance and three local Kafka brokers. We will test-drive the setup by sending messages to the cluster via a console producer and receive those messages via a console receiver. I will also describe how to build Kafka for Scala 2.9.2, which makes it much easier to integrate Kafka with other Scala-based frameworks and tools that require Scala 2.9 instead of Kafka’s default Scala 2.8.

What we want to do

Here is an overview of what we want to do:

  • Build Kafka 0.8-trunk for Scala 2.9.2.
    • I also provide instructions for the default 2.8.0, just in case.
  • Use a single machine for this Kafka setup.
  • Run 1 ZooKeeper instance on that machine.
  • Run 3 Kafka brokers on that machine.
  • Create a Kafka topic called “zerg.hydra” and send/receive messages for that topic via the console. The topic will be configured to use 3 partitions and 2 replicas per partition.

The purpose of this article is not to present a production-ready configuration of a Kafka cluster. However it should get you started with using Kafka as a distributed messaging system in your own infrastructure.

Installing Kafka

Background: Why Kafka and Scala 2.9?

Personally I’d like to use Scala 2.9.2 for Kafka – which is still built for Scala 2.8.0 by default as of today – because many related software packages that are of interest to me (such as Finagle, Kestrel) are based on Scala 2.9. Also, the current versions of many development and build tools (e.g. IDEs, sbt) for Scala require at least version 2.9. If you are working in a similar environment you may want build Kafka for Scala 2.9 just like Michael G. Noll did – otherwise you can expect to run into issues such as Scala version conflicts.

Option 1 (preferred): Kafka 0.8-trunk with Scala 2.9.2

Unfortunately the current trunk of Kafka has problems to build against Scala 2.9.2 out of the box. Michael G. Noll created a fork of Kafka 0.8-trunk that includes the required fix (a change of one file) in the branch “scala-2.9.2”. The fix ties the Scala version used by Kafka’s shell scripts to 2.9.2 instead of 2.8.0.

The following instructions will use this fork to download, build and install Kafka for Scala 2.9.2:

$ cd $HOME
$ git clone git@github.com:miguno/kafka.git
$ cd kafka
# this branch of includes a patched bin/kafka-run-class.sh for Scala 2.9.2
$ git checkout -b scala-2.9.2 remotes/origin/scala-2.9.2
$ ./sbt update
$ ./sbt "++2.9.2 package"

Option 2: Kafka 0.8-trunk with Scala 2.8.0

If you are fine with Scala 2.8 you need to build and install Kafka as follows.

$ cd $HOME
$ git clone git@github.com:apache/kafka.git
$ cd kafka
$ ./sbt update
$ ./sbt package

Configuring and running Kafka

Unless noted otherwise all commands below assume that you are in the top level directory of your Kafka installation. If you followed the instructions above, this directory is $HOME/kafka/.

Configure your OS

For Kafka 0.8 it is recommended to increase the maximum number of open file handles because due to changes in 0.8 Kafka will keep more file handles open than in 0.7. The exact number depends on your usage patterns, of course, but on the Kafka mailing list the ballpark figure “tens of thousands” was shared:

In Kafka 0.8, we keep the file handles for all segment files open until they are garbage collected. Depending on the size of your cluster, this number can be pretty big. Few 10 K or so.

For instance, to increase the maximum number of open file handles for the user kafkato 98,304 (change kafka to whatever user you are running the Kafka daemons with – this can be your own user account, of course) you must add the following line to /etc/security/limits.conf:

/etc/security/limits.conf
1
kafka    -    nofile    98304

Start ZooKeeper

Kafka ships with a reasonable default ZooKeeper configuration for our simple use case. The following command launches a local ZooKeeper instance.

Start ZooKeeper
1
$ bin/zookeeper-server-start.sh config/zookeeper.properties

By default the ZooKeeper server will listen on *:2181/tcp.

Configure and start the Kafka brokers

We will create 3 Kafka brokers, whose configurations are based on the default config/server.properties. Apart from the settings below the configurations of the brokers are identical.

The first broker:

Create the config file for broker 1
1
$ cp config/server.properties config/server1.properties

Edit config/server1.properties and replace the existing config values as follows:

broker.id=1
port=9092
log.dir=/tmp/kafka-logs-1

The second broker:

Create the config file for broker 2
1
$ cp config/server.properties config/server2.properties

Edit config/server2.properties and replace the existing config values as follows:

broker.id=2
port=9093
log.dir=/tmp/kafka-logs-2

The third broker:

Create the config file for broker 3
1
$ cp config/server.properties config/server3.properties

Edit config/server3.properties and replace the existing config values as follows:

broker.id=3
port=9094
log.dir=/tmp/kafka-logs-3

Now you can start each Kafka broker in a separate console:

Start the first broker in its own terminal session
1
$ env JMX_PORT=9999  bin/kafka-server-start.sh config/server1.properties
Start the second broker in its own terminal session
1
$ env JMX_PORT=10000 bin/kafka-server-start.sh config/server2.properties
Start the third broker in its own terminal session
1
$ env JMX_PORT=10001 bin/kafka-server-start.sh config/server3.properties

Here is a summary of the configured network interfaces and ports that the brokers will listen on:

        Broker 1     Broker 2      Broker 3
----------------------------------------------
Kafka   *:9092/tcp   *:9093/tcp    *:9094/tcp
JMX     *:9999/tcp   *:10000/tcp   *:10001/tcp

Excursus: Topics, partitions and replication in Kafka

In a nutshell Kafka partitions incoming messages for a topic, and assigns those partitions to the available Kafka brokers. The number of partitions is configurable and can be set per-topic and per-broker.

First the stream [of messages] is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order.

A new feature of Kafka 0.8 is that those partitions will be now be replicated across Kafka brokers to make the cluster more resilient against host failures:

Partitions are now replicated. Previously the topic would remain available in the case of server failure, but individual partitions within that topic could disappear when the server hosting them stopped. If a broker failed permanently any unconsumed data it hosted would be lost. Starting with 0.8 all partitions have a replication factor and we get the prior behavior as the special case where replication factor = 1. Replicas have a notion of committed messages and guarantee that committed messages won’t be lost as long as at least one replica survives. Replica are byte-for-byte identical across replicas.

Producer and consumer are replication aware. When running in sync mode, by default, the producer send() request blocks until the messages sent is committed to the active replicas. As a result the sender can depend on the guarantee that a message sent will not be lost. Latency sensitive producers have the option to tune this to block only on the write to the leader broker or to run completely async if they are willing to forsake this guarantee. The consumer will only see messages that have been committed.

The following diagram illustrates the relationship between topics, partitions and replicas.

The relationship between topics, partitions and replicas in Kafka.

Logically this relationship is very similar to how Hadoop manages blocks and replication in HDFS.

When a topic is created in Kafka 0.8, Kafka determines how each replica of a partition is mapped to a broker. In general Kafka tries to spread the replicas across all brokers (source). Messages are first sent to the first replica of a partition (i.e. to the current “leader” broker of that partition) before they are replicated to the remaining brokers. Message producers may choose from different strategies for sending messages (e.g. synchronous mode, asynchronous mode). Producers discover the available brokers in a cluster and the number of partitions on each, by registering watchers in ZooKeeper.

If you wonder how to configure the number of partitions per topic/broker, here’s feedback from LinkedIn developers:

At LinkedIn, some of the high volume topics are configured with more than 1 partition per broker. Having more partitions increases I/O parallelism for writes and also increases the degree of parallelism for consumers (since partition is the unit for distributing data to consumers). On the other hand, more partitions adds some overhead: (a) there will be more files and thus more open file handlers; (b) there are more offsets to be checkpointed by consumers which can increase the load of ZooKeeper. So, you want to balance these tradeoffs.

Create a Kafka topic

In Kafka 0.8, there are 2 ways of creating a new topic:

  1. Turn on auto.create.topics.enable option on the broker. When the broker receives the first message for a new topic, it creates that topic with num.partitionsand default.replication.factor.
  2. Use the admin command bin/kafka-topics.sh.

We will use the latter approach. The following command creates a new topic “zerg.hydra”. The topic is configured to use 3 partitions and a replication factor of 2. Note that in a production setting we’d rather set the replication factor to 3, but a value of 2 is better for illustrative purposes (i.e. we intentionally use different values for the number of partitions and replications to better see the effects of each setting).

Create the “zerg.hydra” topic
1
2
$ bin/kafka-topics.sh --zookeeper localhost:2181 \
    --create --topic zerg.hydra --partitions 3 --replication-factor 2

This has the following effects:

  • Kafka will create 3 logical partitions for the topic.
  • Kafka will create a total of two replicas (copies) per partition. For each partition it will pick two brokers that will host those replicas. For each partition Kafka will elect a “leader” broker.

Ask Kafka for a list of available topics. The list should include the new zerg.hydratopic:

List the available topics in the Kafka cluster
1
2
3
4
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
<snipp>
zerg.hydra
</snipp>

You can also inspect the configuration of the topic as well as the currently assigned brokers per partition and replica. Because a broker can only host a single replica per partition, Kafka has opted to use a broker’s ID also as the corresponding replica’s ID.

List the available topics in the Kafka cluster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic zerg.hydra
<snipp>
zerg.hydra
    configs:
    partitions: 3
        partition 0
        leader: 1 (192.168.0.153:9092)
        replicas: 1 (192.168.0.153:9092), 2 (192.168.0.153:9093)
        isr: 1 (192.168.0.153:9092), 2 (192.168.0.153:9093)
        partition 1
        leader: 2 (192.168.0.153:9093)
        replicas: 2 (192.168.0.153:9093), 3 (192.168.0.153:9094)
        isr: 2 (192.168.0.153:9093), 3 (192.168.0.153:9094)
        partition 2
        leader: 3 (192.168.0.153:9094)
        replicas: 3 (192.168.0.153:9094), 1 (192.168.0.153:9092)
        isr: 3 (192.168.0.153:9094), 1 (192.168.0.153:9092)
<snipp>

In this example output the first broker (with broker.id = 1) happens to be the designated leader for partition 0 at the moment. Similarly, the second and third brokers are the leaders for partitions 1 and 2, respectively.

The following diagram illustrates the setup (and also includes the producer and consumer that we will run shortly).

Overview of our Kafka setup including the current state of the partitions and replicas. The colored boxes represent replicas of partitions. “P0 R1” denotes the replica with ID 1 for partition 0. A bold box frame means that the corresponding broker is the leader for the given partition.

You can also inspect the local filesystem to see how the --describe output above matches actual files. By default Kafka persists topics as “log files” (Kafka terminology) in the log.dir directory.

Local files that back up the partitions of Kafka topics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ tree /tmp/kafka-logs-{1,2,3}
/tmp/kafka-logs-1                   # first broker (broker.id = 1)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-2                   # second broker (broker.id = 2)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-3                   # third broker (broker.id = 3)
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

6 directories, 15 files

Caveat: Deleting a topic via bin/kafka-topics.sh --delete will apparently not delete the corresponding local files for that topic. I am not sure whether this behavior is expected or not.

Start a producer

Start a console producer in sync mode:

Start a console producer in sync mode
1
2
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync \
    --topic zerg.hydra

Example producer output:

[...] INFO Verifying properties (kafka.utils.VerifiableProperties)
[...] INFO Property broker.list is overridden to localhost:9092,localhost:9093,localhost:9094 (...)
[...] INFO Property compression.codec is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property key.serializer.class is overridden to kafka.serializer.StringEncoder (...)
[...] INFO Property producer.type is overridden to sync (kafka.utils.VerifiableProperties)
[...] INFO Property queue.buffering.max.messages is overridden to 10000 (...)
[...] INFO Property queue.buffering.max.ms is overridden to 1000 (kafka.utils.VerifiableProperties)
[...] INFO Property queue.enqueue.timeout.ms is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property request.required.acks is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property request.timeout.ms is overridden to 1500 (kafka.utils.VerifiableProperties)
[...] INFO Property send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[...] INFO Property serializer.class is overridden to kafka.serializer.StringEncoder (...)

You can now enter new messages, one per line. Here we enter two messages “Hello, world!” and “Rock: Nerf Paper. Scissors is fine.”:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

After the messages are produced, you should see the data being replicated to the three log directories for each of the broker instances, i.e. /tmp/kafka-logs-{1,2,3}/zerg.hydra-*/.

Start a consumer

Start a console consumer that reads messages in zerg.hydra from the beginning (in a production setting you would usually NOT want to add the --from-beginning option):

Start a console consumer
1
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

The consumer will see a new message whenever you enter a message in the producer above.

Example consumer output:

<snipp>
[...] INFO [console-consumer-28434_panama.local-1363174829799-954ed29e], Connecting to zookeeper instance at localhost:2181 ...
[...] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[...] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT ...
[...] INFO Client environment:host.name=192.168.0.153 (org.apache.zookeeper.ZooKeeper)
<snipp>
[...] INFO Fetching metadata with correlation id 0 for 1 topic(s) Set(zerg.hydra) (kafka.client.ClientUtils$)
[...] INFO Connected to 192.168.0.153:9092 for producing (kafka.producer.SyncProducer)
[...] INFO Disconnecting from 192.168.0.153:9092 (kafka.producer.SyncProducer)
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-3], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 2, initOffset -1 to broker 3 with fetcherId 0 ...
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-2], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 1, initOffset -1 to broker 2 with fetcherId 0 ...
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-1], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 0, initOffset -1 to broker 1 with fetcherId 0 ...

And at the end of the output you will see the following messages:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

That’s it!

A note when using Kafka with Storm

The maximum parallelism you can have on a KafkaSpout is the number of partitions of the corresponding Kafka topic. The following question-answer thread (Michael G. Noll slightly modified the original text for clarification purposes) is from the Storm user mailing list, but supposedly refers to Kafka pre-0.8 and thereby before the replication feature was added:

Question: Suppose the number of Kafka partitions per broker is configured as 1 and the number of hosts is 2. If we set the spout parallelism as 10, then how does Storm handle the difference between the number of Kafka partitions and the number of spout tasks? Since there are only 2 partitions, does every other spout task (greater than first 2) not read the data or do they read the same data?

Answer (by Nathan Marz): The remaining 8 (= 10 – 2) spout tasks wouldn’t read any data from the Kafka topic.

My current understanding is that the number of partitions (i.e. regardless of replicas) is still the limiting factor for the parallelism of a KafkaSpout. Why? Because Kafka is not allowing consumers to read from replicas other than the (replica of the) leader of a partition to simplify concurrent access to data in Kafka.

A note when using Kafka with Hadoop

LinkedIn has published their Kafka->HDFS pipeline named Camus. It is a MapReduce job that does distributed data loads out of Kafka.

Where to go from here

The following documents provide plenty of information about Kafka that goes way beyond what I covered in this article: