Hello Mesos
15 Nov 2014
15 Nov 2014
There’s always some new piece of software to be excited about and right now it’s Apache Mesos. Mesos is a cluster management system intended to make wrangling large numbers of machines less of a headache. Unfortunately finding a more detailed explanation of how it actually works can be challenging. The official site says:
Apache Mesos abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively.
Wow that sounds great! But how do I use it?
The Mesosphere website has a lot of language like:
Grow to tens of thousands of nodes effortlessly while dynamically allocating resources with ease.
Again, sounds really cool! But I still have no idea how to actually program against it.
Mesos has a lot of hype right now, so most descriptions of it tend to be vague and full of buzzwords and hyperbole1. It’s difficult to find any good examples showing how it actually works. Here I’ll detail my quest to do just that, and explain the code for my Hello World Mesos framework, which is hopefully one such example.
With a bit more googling, it’s not too difficult to find 10,000 foot view explanations of the mechanics of Mesos. The architecture guide on the official site is OK. This talk from David Greenberg is a really good explanation of the basic concepts. The Mesos Paper is also quite readable and does a good job justifying the system’s design.
The core abstraction of Mesos is a framework, which consists of a
scheduler and an executor. A scheduler coordinates distributing
some work, and a executor does some part of that work. Many familiar
systems fit into this pattern. Consider Hadoop (the JobTracker
is a
scheduler and a TaskTracker
is an executor), traditional HPC cluster
management systems like TORQUE (pbs_sched
is an scheduler, pbs_mom
is an executor), or a web application deployment system (the deploy
server is a scheduler, the application itself is an executor).
The goal of Mesos is to abstract the machines in a cluster away from all these frameworks that have to run on it. The way this works is that the cluster is entirely managed by Mesos—all machines in the cluster are configured as Mesos slaves2, which register with a Mesos master. The Mesos master makes resource offers (which are basically descriptions of an available Mesos slave) to the framework schedulers, which can claim those offers for their executors. Mesos then handles actually launching executors on slaves, communicating executor status back to the framework scheduler, etc. What’s cool about this is that once you have it set up, you can reallocate resources among the various frameworks running in your cluster by simply tweaking the settings that determine how Mesos allocates resource offers between frameworks, rather than having to manually reconfigure your machines.
What’s left unsaid in all of the above, however, is how to actually write a Mesos framework. Googling for “mesos hello world” yields this, which is pretty neat, but I don’t know Scala, I’ve heard Mesos has a Python API, and I really like using Python when I’m learning how to do something new. Looking for Python examples yields this, but for some reason its author includes extraneous discussion of genetic programming, monoids, HDFS, route planning, and a bazillion other things that only serve to obscure the main point.
I just wanted a simple hello world example in Python, which is exactly what I’ll walk you through now. All of this code that follows assumes Mesos version 0.20.0:
import logging
import uuid
import time
from mesos.interface import Scheduler
from mesos.native import MesosSchedulerDriver
from mesos.interface import mesos_pb2
logging.basicConfig(level=logging.INFO)
def new_task(offer):
task = mesos_pb2.TaskInfo()
id = uuid.uuid4()
task.task_id.value = str(id)
task.slave_id.value = offer.slave_id.value
task.name = "task {}".format(str(id))
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = 1
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = 1
return task
class HelloWorldScheduler(Scheduler):
def registered(self, driver, framework_id, master_info):
logging.info("Registered with framework id: {}".format(framework_id))
def resourceOffers(self, driver, offers):
logging.info("Recieved resource offers: {}".format([o.id.value for o in offers]))
# whenever we get an offer, we accept it and use it to launch a task that
# just echos hello world to stdout
for offer in offers:
task = new_task(offer)
task.command.value = "echo hello world"
time.sleep(2)
logging.info("Launching task {task} "
"using offer {offer}.".format(task=task.task_id.value,
offer=offer.id.value))
tasks = [task]
driver.launchTasks(offer.id, tasks)
if __name__ == '__main__':
# make us a framework
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # Have Mesos fill in the current user.
framework.name = "hello-world"
driver = MesosSchedulerDriver(
HelloWorldScheduler(),
framework,
"zk://localhost:2181/mesos" # assumes running on the master
)
driver.run()
Let’s step through it one piece at a time. We start with a fairly typical slew of standard library imports
import logging
import uuid
import time
before importing the relevant parts of the mesos
library:
from mesos.interface import Scheduler
from mesos.native import MesosSchedulerDriver
from mesos.interface import mesos_pb2
Unfortunately mesos
is not pip-installable at the moment. The best
way to use it is to easy_install
pre-built eggs for your platform,
which can be obtained from the
Mesosphere downloads page.
There are two classes involved here. We subclass the Scheduler
class
and implement callbacks containing the logic of our framework. These
callbacks are invoked when messages are received from the Mesos
master. The MesosSchedulerDriver
handles all communication with the
master—the scheduler delegates to the driver when it has something to
communicate. mesos_pb2
contains
protobuf definitions
that we’ll need (Mesos uses protobufs for network communication).
Next we set up some quick logging
logging.basicConfig(level=logging.INFO)
and declare our scheduler class.
class HelloWorldScheduler(Scheduler):
We’re going to write a scheduler called HelloWorldScheduler
. This
scheduler is very simple: whenever it gets a resource offer from the
Mesos master, it uses the offer to launch an executor that runs echo
hello world
. Inheriting from mesos.interface.Scheduler
gives us
stub implementations of the Mesos API methods we aren’t going to
override. You can see a complete list of these methods with their
descriptions
here. The
first one is registered
:
def registered(self, driver, framework_id, master_info):
logging.info("Registered with framework id: {}".format(framework_id))
registered
is a method that gets invoked with when this framework
registers with the Mesos master. We get passed our the id we’ve been
assigned (framework_id
) and a protobuf containing information about
the master we’re registered with (master_info
). We’re just logging
that this happened, but you could imagine setting up stateful
resources (e.g. database connections) here.
Next we implement resourceOffers
, which is a method that gets
invoked when the scheduler receives resource offers from the Mesos master.
def resourceOffers(self, driver, offers):
logging.info("Recieved resource offers: {}".format([o.id.value for o in offers]))
# whenever we get an offer, we accept it and use it to launch a task that
# just echos hello world to stdout
for offer in offers:
task = new_task(offer)
task.command.value = "echo hello world"
time.sleep(2)
logging.info("Launching task {task} "
"using offer {offer}.".format(task=task.task_id.value,
offer=offer.id.value))
tasks = [task]
driver.launchTasks(offer.id, tasks)
This is the meat of the scheduler. Let’s step through it a few lines at a time:
def resourceOffers(self, driver, offers):
resourceOffers
is passed the driver
that our scheduler is being
run by, as well as offers
(which is a list of protobufs, each of
which contains information about an offer). Remember that the
scheduler class only contains our framework specific logic and
delegates all communication with Mesos to the driver. This is why we
are passed the driver in this method—we’ll need to tell the Mesos
master what we want to do with these offers3.
for offer in offers:
task = new_task(offer)
We iterate over the offers received, creating a new task for
each. Let’s look at the implementation of new_task
:
def new_task(offer):
task = mesos_pb2.TaskInfo()
id = uuid.uuid4()
task.task_id.value = str(id)
task.slave_id.value = offer.slave_id.value
task.name = "task {}".format(str(id))
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = 1
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = 1
return task
We instantiate a new TaskInfo
protobuf4 and fill it in with some
basic details (a unique id, the id of the slave we want to use, and a
name). We then request 1 CPU and 1 megabyte of memory. We aren’t
actually checking to make sure the offer contains these resources, but
it probably does (it’s quite a modest request), and we could make
sure if we wanted to by inspecting the offer.resources
list. We then
return the protobuf. OK, let’s jump back to resourceOffers
:
task.command.value = "echo hello world"
now that we’ve created a generic task protobuf, we fill in its
command
field with what we actually want the task to do, in this
case simply echo hello world
.
time.sleep(2)
logging.info("Launching task {task} "
"using offer {offer}.".format(task=task.task_id.value,
offer=offer.id.value))
tasks = [task]
driver.launchTasks(offer.id, tasks)
We then sleep for 2 seconds (which is there just so it’s easier to
watch the framework run in real time), log the fact of being about to
launch the task, and do so by calling driver.launchTasks
with the id
of the offer we want to use and the list of tasks we want to launch on
it5.
Anyway, that’s the entirety of our scheduler class! Now we just need to start up a driver and connect to the Mesos master.
if __name__ == '__main__':
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # Have Mesos fill in the current user.
framework.name = "hello-world"
driver = MesosSchedulerDriver(
HelloWorldScheduler(),
framework,
"zk://localhost:2181/mesos" # assumes running on the master
)
driver.run()
The MesosSchedulerDriver
takes three parameters: an instance of
something that implements the Scheduler
interface (in our case a
HelloWorldScheduler
), a FrameworkInfo
protobuf (which has things
like the id and name of this framework), and a string containing the
Zookeeper address URI of the Mesos cluster we want the framework to
register with6. Note that in this code, we assume that the
Zookeeper instance is running on the same machine that the framework
is being started on (in general this doesn’t have to be the case). The
driver we instantiate here is the object that gets passed to the
scheduler’s callbacks to allow it to communicate with Mesos.
After instantiating the driver, we then call driver.run()
to start the framework.
I ran this code using python hello_mesos.py
on a small Mesos cluster
with a single master and a single slave. What results is:
I1116 15:54:31.813361 27339 sched.cpp:139] Version: 0.20.0
2014-11-16 15:54:31,813:27339(0x7fe781a9d700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
2014-11-16 15:54:31,815:27339(0x7fe781a9d700):ZOO_INFO@log_env@716: Client environment:host.name=mesos-master.novalocal
2014-11-16 15:54:31,816:27339(0x7fe781a9d700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
2014-11-16 15:54:31,816:27339(0x7fe781a9d700):ZOO_INFO@log_env@724: Client environment:os.arch=3.13.0-32-generic
2014-11-16 15:54:31,817:27339(0x7fe781a9d700):ZOO_INFO@log_env@725: Client environment:os.version=#57-Ubuntu SMP Tue Jul 15 03:51:08 UTC 2014
2014-11-16 15:54:31,818:27339(0x7fe781a9d700):ZOO_INFO@log_env@733: Client environment:user.name=ubuntu
2014-11-16 15:54:31,819:27339(0x7fe781a9d700):ZOO_INFO@log_env@741: Client environment:user.home=/home/ubuntu
2014-11-16 15:54:31,820:27339(0x7fe781a9d700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/ubuntu
2014-11-16 15:54:31,820:27339(0x7fe781a9d700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=10000 watcher=0x7fe782eefa90 sessionId=0 sessionPasswd=<null> context=0x7fe774000930 flags=0
2014-11-16 15:54:31,823:27339(0x7fe76ffff700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181]
2014-11-16 15:54:31,826:27339(0x7fe76ffff700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x149b22cdfe600a8, negotiated timeout=10000
I1116 15:54:31.826449 27350 group.cpp:313] Group process (group(1)@172.16.1.34:59733) connected to ZooKeeper
I1116 15:54:31.826498 27350 group.cpp:787] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
I1116 15:54:31.826529 27350 group.cpp:385] Trying to create path '/mesos' in ZooKeeper
I1116 15:54:31.830415 27350 detector.cpp:138] Detected a new leader: (id='0')
I1116 15:54:31.831310 27350 group.cpp:658] Trying to get '/mesos/info_0000000000' in ZooKeeper
I1116 15:54:31.833284 27350 detector.cpp:426] A new leading master (UPID=master@172.16.1.34:5050) is detected
I1116 15:54:31.834072 27350 sched.cpp:235] New master detected at master@172.16.1.34:5050
I1116 15:54:31.835058 27350 sched.cpp:243] No credentials provided. Attempting to register without authentication
I1116 15:54:31.838003 27349 sched.cpp:409] Framework registered with 20141115-003844-570495148-5050-6532-0067
INFO:root:Registered with framework id: value: "20141115-003844-570495148-5050-6532-0067"
INFO:root:Recieved resource offers: [u'20141115-003844-570495148-5050-6532-387']
INFO:root:Got a resource offer.
INFO:root:Launching task bcebedef-6e37-450d-99ca-84206a5385de using offer 20141115-003844-570495148-5050-6532-387.
INFO:root:Recieved resource offers: [u'20141115-003844-570495148-5050-6532-388']
INFO:root:Got a resource offer.
INFO:root:Launching task 197bcdd0-91e0-4647-a271-00356c6e1133 using offer 20141115-003844-570495148-5050-6532-388.
INFO:root:Recieved resource offers: [u'20141115-003844-570495148-5050-6532-389']
INFO:root:Got a resource offer.
INFO:root:Launching task 376f65f6-f7c2-4e0a-8307-a7c489348a4e using offer 20141115-003844-570495148-5050-6532-389.
. . .
The log messages go on forever as the scheduler continuously receives
a single resource offer (corresponding to the single slave) and
launches echo hello world
on it.
We can see that the task is running successfully by ssh
ing into the
slave and checking the stdout
of one of the tasks (all of which are
saved in logfiles):
ubuntu@mesos-slave:~$ cat /tmp/mesos/slaves/20141115-003844-570495148-5050-6532-0/frameworks/20141115-003844-570495148-5050-6532-0067/executors/bcebedef-6e37-450d-99ca-84206a5385de/runs/latest/stdout
Registered executor on mesos-slave.novalocal
Starting task bcebedef-6e37-450d-99ca-84206a5385de
Forked command at 13319
sh -c 'echo hello world'
hello world
Command exited with status 0 (pid: 13319)
Ta-da! A Mesos framework in 60 lines of code.
Hopefully this helps to clarify somewhat how the system works for others who were confused.
This isn’t helped by that fact that people deploying a Mesos cluster don’t usually have to program directly against it themselves, but rather just use frameworks already written by other people—knowing the details of what’s going on underneath is an afterthought. ↩
I feel uncomfortable using the master/slave terminology and agree with the Django team that it’s bad, but it’s the official Mesos terminology, so in order to avoid confusion I defer and reluctantly describe things this way. ↩
As far as where the driver comes from in the first place, we’ll get there, be patient. ↩
By the way, you can read all of the Mesos protobuf definitions here. This can be pretty useful for figuring out what your options are when communicating using them, as well as just how the system works in general. Some of the comments are very handy, it’s a shame they’re buried here rather than displayed prominently in the documentation. ↩
One confusing bit here is the seemingly useless creation of a
reference (tasks
) to the list containing the single task we
want to launch. It would seems simpler to just do
driver.launchTasks(offer.id, [tasks])
. I am not sure why this
happens, but when I do that, the driver’s Python-C++ interface
crashes
here. I
suspect some sort of GC issue. ↩
Mesos uses Zookeeper for state tracking and leader election. ↩