Sunday, June 16, 2013

The Pragmatic Programmer

I've just finished reading The Pragmatic Programmer by Hunt&Thomas. I'll try to be pragmatic and keep it brief; I liked the book because it is
  • timeless - talks about patterns and ideas as opposed to domain-specific tips'n'trick
  • well-written - no waffle or forced jokes, the reader is easily able to relate,
  • down to the point - book well structured, every chapter hides some value for the reader.
The authors discuss critical aspects and problems in software engineering: testing, estimation, orthogonality, "programming by coincidence", refactoring. A really good read, The Pragmatic Programmer was time well spent.

Tuesday, February 26, 2013

Interactive subprocess communication in #Python

subprocess.Popen class comes with a communicate method, which sends input to a child process, then waits and returns the standard output and error response back to the calling process. 

The limitation of the communicate method is that when it's finished writing input it closes child's stdin decriptor, and from the point of view of the subprocess, it's like receiving the end-of-file character (ctrl+D). This is not always desired, e.g. not when you try to automate a process involving commuication through an interactive shell, such as talking to a secured database or a hardware device via ssh through some bastion host.

Two problems to consider here:
  1. How to avoid blocking on I/O while reading the output?
  2. When the output is large (multiple blocks), how to decide when the transmission of the message has completed and when to return the output?
The first problem is addressed by changing properties of file decriptors for stdout and stderr to be nonblocking. In Python, by default, when a descriptor is not ready to be read from, it will block and the read() will only return when data finally arrives. By making a descriptor non-blocking, it returns the data when the data is ready, otherwise it throws IOError.

One way to work around the second problem that is to use a feature of most interactive shells, i.e. the existence of a prompt to signalize the "ready>" state. This way you know the output is ready and can be returned.
Consider the following snippet of code:
# ipopen.py
import os
import time
import fcntl
import subprocess

class IPopen(subprocess.Popen):

    POLL_INTERVAL = 0.1
    def __init__(self, *args, **kwargs):
        """Construct interactive Popen."""
        keyword_args = {
            'stdin': subprocess.PIPE,
            'stdout': subprocess.PIPE,
            'stderr': subprocess.PIPE,
            'prompt': '> ',
        }
        keyword_args.update(kwargs)
        self.prompt = keyword_args.get('prompt')
        del keyword_args['prompt']
        subprocess.Popen.__init__(self, *args, **keyword_args)
        # Make stderr and stdout non-blocking.
        for outfile in (self.stdout, self.stderr):
            if outfile is not None:
                fd = outfile.fileno()
                fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    def correspond(self, text, sleep=0.1):
        """Communicate with the child process without closing stdin."""
        self.stdin.write(text)
        self.stdin.flush()
        str_buffer = ''
        while not str_buffer.endswith(self.prompt):
            try:
                str_buffer += self.stdout.read()
            except IOError:
                time.sleep(sleep)
        return str_buffer

The above is a minimal extension to subprocess.Popen() which allows for exchange of messages with a process, just as you would from its interactive shell:

$ ipython
Python 2.7.3 (default, Aug  1 2012, 05:14:39) 
Type "copyright", "credits" or "license" for more information.

IPython 0.12.1 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: from ipopen import IPopen

In [2]: gdb = IPopen(['gdb'], prompt='(gdb) ')

In [3]: print gdb.correspond('')
GNU gdb (Ubuntu/Linaro 7.4-2012.04-0ubuntu2.1) 7.4-2012.04
Copyright (C) 2012 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
For bug reporting instructions, please see:

(gdb) 

In [4]: print gdb.correspond('help\n')
List of classes of commands:

aliases -- Aliases of other commands
breakpoints -- Making program stop at certain points
data -- Examining data
files -- Specifying and examining files
internals -- Maintenance commands
obscure -- Obscure features
running -- Running the program
stack -- Examining the stack
status -- Status inquiries
support -- Support facilities
tracepoints -- Tracing of program execution without stopping the program
user-defined -- User-defined commands

Type "help" followed by a class name for a list of commands in that class.
Type "help all" for the list of all commands.
Type "help" followed by command name for full documentation.
Type "apropos word" to search for commands related to "word".
Command name abbreviations are allowed if unambiguous.
(gdb) 

In [5]: 

Saturday, February 16, 2013

The impact of #JVM garbage collection on response time

When analyzing responsiveness of Java applications, it's always good to start with looking at the GC activity. 

It makes sense to look at JMX GC stats first, but if you don't see anything obvious, don't discard GC as the potential culprit just yet! Your JMX GC stats might just not be granular enough for impact to be noticeable on 1-minute granularity time series. Only digging in logs will give a conclusive answer.

Now, GC activity timestamps aren't easy to work with, as they correspond to JVM uptime, rather than a standard Unix timestamp or formatted date as is the case of pretty much every other log timestamp format.

An example entry goes as follows (read Poonam's Bajaj post on how to decypher them):
 
39.910: [GC 39.910: [ParNew: 261760K->0K(261952K), 0.2314667 secs] 262017K->26386K(1048384K), 0.2318679 secs]

Finding out creation date of a process on Linux is simple (stat /proc/<jvm pid>), but JVM takes some time to bootstrap itself, so its timer starts about 100ms later than the creation date on the /proc filesystem. What this means is GC logstamps need to be adjusted accordingly in order to reflect timing more accurately, in my case by about 138ms.

precise GC time = GC logstamp + JVM process start + JVM bootstrap overhead

At millisecond granularity, correlating precise GC times with request logs of an application allows to assess the extent of impact a garbage collector has on the app.

The following is an example of a millisecond-granularity breakdown of open requests to a high-throughput application during a sample one minute period. During normal operation there isn't more than 5 active requests being served at the same time.
Request data can be generated by extracting start time of a request and corresponding request duration from the logs. Data on when and for how long GC is taking place can be put together the same way.

Looking at the plot of GC activity (pink=activity, white=lack of activity) spikes seem to roughly correspond to the being collector at work.
The correlation becomes clearly visible when GC and request data are overlayed and zoomed in on - as below, to the 6 seconds period, between seconds 14 and 20.
Look at what happens: the requests are being enqueued but not processed while the collector is running. When they're finally being processed, each takes less than a millisecond. As a result, one might be feeling pretty comfortable about response time average being low if such average is calculated via request duration from the logs (the application really thinks the request took <1 ms), while in reality some requests are held up for about 300 milliseconds, which is how long it takes for an external caller to get an answer.

Sunday, February 10, 2013

Simple Workflow example with boto.swf: the #MTurk Requester

In this post I'm going to demonstrate the application of Amazon Simple Workflow using Eric's suggestion from the original boto.swf post as an example.

As a resident of Europe, I'm unable to use MTurk as requester, my assumptions about MTurk Requester and its operation come from code in boto.mturk. I'm hoping that the workflow presented below is simple and generic enough to be easily tweaked as necessary.

The scenario goes as follows: The requester publishes a HIT group and waits for solutions from individual human workers to start coming in. As the solutions arrive, they're subjected to a number of validity checks (such as the rules to go by as listed in the HIT description). If the checks pass, the requester wants to accept the hit and reward the human worker. Otherwise, the assignment is marked as rejected and may resurface on the MTurk website for other workers.  When all assignments are completed, the HIT group may be closed and the workflow finishes.

So first, let's model this scenario as a workflow, starting with discerning activities. I see five activities here, with [nicknames] in square brackets:
  • Receive a completed assignment from a worker [ReceiveAssignment],
  • verify its correctness [VerifyAssignment],
  • approve when correct [ApproveAssignment],
  • reject otherwise [RejectAssignment],
  • see how many are left to do [CountOpenAssignments].
Now, so let's think about a control flow to build the business logic out of these blocks.
  • poll for a completed assignment and on arrival of a solution
  • validate it
    • if the solution is valid,
      • approve and reward the worker,
      • count remaining open tasks
        • if none are left, then close the workflow.
    • if, on the other hand, the checks fail, 
      • reject solution and
      • post the assignment once again,
  • poll for the next assignment (goto start)

Before we start, we'll need to register workflow and activities with Amazon Simple Workflow Service. It's a one-off action and can be done from an interactive interpreter.
$ export AWS_ACCESS_KEY_ID=access_key
$ export AWS_SECRET_ACCESS_KEY=secret_key
$ ipython
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
Type "copyright", "credits" or "license" for more information.

IPython 0.10 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object'. ?object also works, ?? prints more.

In [1]: import boto.swf.layer2 as swf
In [2]: DOMAIN='mturk_requester'
In [3]: WF_NAME = 'ProcessHitGroup' 
In [4]: TASKLIST = WF_NAME + 'Tasks' 
In [5]: VERSION = '1.0'
In [6]: mturk_domain = swf.Domain(name=DOMAIN)
In [7]: mturk_domain.register()
In [8]: workflow = swf.WorkflowType(domain=DOMAIN, name=WF_NAME, version=VERSION, 
...                                 task_list=TASKLIST)

In [9]: workflow.register()
In [10]: for activity_name in ('ReceiveAssignment', 'VerifyAssignment', 'AcceptAssignment',
...                            'RejectAssignment', 'CountOpenAssignments'):
...    swf.ActivityType(domain=DOMAIN, name=activity_name, version=VERSION,
...                     task_list=TASKLIST).register()
 
# To verify successful registration:

In [11]: mturk_domain.workflows()
Out[11]: [<WorkflowType 'ProcessHitGroup-1.0' at 0x1e31550>]
In [12]: mturk_domain.activities()
Out[12]: 
[<ActivityType 'ApproveAssignment-1.0' at 0x1e31950>,
 <ActivityType 'CountOpenAssignments-1.0' at 0x1e31a90>,
 <ActivityType 'ReceiveAssignment-1.0' at 0x1e31c10>,
 <ActivityType 'RejectAssignment-1.0' at 0x1e31dd0>,
 <ActivityType 'VerifyAssignment-1.0' at 0x1e31f90>]

In order to minimize data duplication, I recommend creating a config.py file and populating it with constants.

# config.py
DOMAIN = 'mturk'
WF_NAME = 'ProcessHitGroup'
TASKLIST = WF_NAME + 'Tasks'
VERSION = '1.0'

The Worker
You can run a dedicated activity worker for each type of activity, or you can implement a worker capable of responding to a number of activities and run a few such workers in parallel. Workers capable of responding to different types of tasks might not be viable for all kinds of workflows, but they might increase robustness: some of your workers may crash and never recover. If a group of workers specialized for one particular activity dies out, the workflow may hang, whereas a single surviving poly-task worker will still plough along through the workflow and will bring a workflow to a close successfully.
import activity_impl
import boto.swf.layer2 as swf
import json
from config import *

class MturkRequesterActivityWorker(swf.ActivityWorker):
    
    domain = DOMAIN
    version = VERSION
    task_list = TASKLIST

    def run(self):
        activity_task = self.poll()
        # poll() returns a dictionary. When no tasks are enqueued in SWF,
        # the connection is kept open for a minute (long poll) and the service
        # comes back with {...}. When an actual task is received, the dictionary 
        # contains 'activityId' key.
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            activity = {
                'ReceiveAssignment': self._receive_assignment,
                'VerifyAssignment': self._verify_assignment,
                'CountOpenAssignments': self._count_open_assignments,
                'RejectAssignment': self._reject_assignment,
                'ApproveAssignment': self._approve_assignment
            }.get(activity_task['activityType']['name'])

            # If anything happens, report the failure and its reason.
            try:
                activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def _receive_assignment(self, _):
        # Input into this activity is disregarded.
        assignment = activity_impl.receive_assignment()
        self.complete(result=json.dumps(assignment))

    def _verify_assignment(self, assignment_contents):
        # It is assumed that verification may succeed or fail in which case
        # reason for a failure may be provided.
        assignment = json.loads(assignment_contents)
        acceptable, reason = activity_impl.verify_assignment(assignment['result'])
        self.complete(result=json.dumps({
            'acceptable': acceptable,
            'reason': reason,
            'assignment_id' : assignment['assignment_id']
        }))

    def _count_open_assignments(self, _):
        # Return assignment count. 
        self.complete(result=str(activity_impl.count_open_assignments()))

    def _reject_assignment(self, assignment_id):
        activity_impl.reject_assignment(assignment_id)
        # Optionally:
        activity_impl.reschedule_assignment(assignment_id)
        self.complete()

    def _approve_assignment(self, assignment_id):
        activity_impl.approve_assignment(assignment_id)
        self.complete()

worker = MturkRequesterActivityWorker()

The Decider
The thing to remember when implementing a decider is that a decider is meant to embody the business logic and logic alone. In particular, the only external web service a decider should be calling is AWS Simple Workflow. If a decider needs additional information to make a decision, which requires some simple yet non-local  HTTP GET call, it is advised to resist the urge of implementing the calling code in the decider and instead delegate the operation to an activity worker in some minor task.
import time
import json
from config import *
import boto.swf.layer2 as swf

class MturkRequesterDecider(swf.Decider):

    domain = DOMAIN
    task_list = TASKLIST
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            decisions = swf.Layer1Decisions()
            # Decision* events are irrelevant here and can be ignored.
            workflow_events = [e for e in history['events'] 
                               if not e['eventType'].startswith('Decision')]
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('ra-%i' % time.time(),
                                                 'ReceiveAssignment',
                                                 VERSION, task_list=TASKLIST)
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1. Get the name of the completed activity.
                # 1a. What is the event id of the completed activity? 
                completed_activity_id = last_event['activityTaskCompletedEventAttributes']['scheduledEventId'] - 1
                # 1b. What is the name of the completed activity?
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 1c. Get the result from the activity, if any at all.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')
                # Take the decision.
                if activity_name == 'ReceiveAssignment':
                    # Completed Assignment just came in. Verify it.
                    decisions.schedule_activity_task('va-%i' % time.time(),
                                                     'VerifyAssignment',
                                                     VERSION, task_list=TASKLIST,
                                                     input=result)
                elif activity_name == 'VerifyAssignment':
                    output = json.loads(result)
                    if output['acceptable']:
                        decisions.schedule_activity_task('aa-%i' % time.time(),
                            'ApproveAssignment', VERSION, task_list=TASKLIST,
                            input=str(output['assignment_id']))
                    else:
                        decisions.schedule_activity_task('da-%i' % time.time(),
                            'RejectAssignment', VERSION, task_list=TASKLIST,
                            input=str(output['assignment_id']))
                elif activity_name == 'ApproveAssignment':
                    # Close workflow if there are no more activity tasks running.
                    decisions.schedule_activity_task('ca-%i' % time.time(),
                         'CountOpenAssignments', VERSION, task_list=TASKLIST)
                elif activity_name == 'CountOpenAssignments' and int(result) == 0:
                        decisions.complete_workflow_execution()
                else:
                    decisions.schedule_activity_task('ra-%i' % time.time(),
                        'ReceiveAssignment', VERSION, task_list=TASKLIST)

            self.complete(decisions=decisions)
            return True

decider = MturkRequesterDecider()

Mock Implementation
It should be possible to tweak the following mock implementation to have this code actually talk to the MTurk and SQS. Right now it's just a proof of concept for purposes of toying with Amazon SWF.

#activity_impl.py
import time
import random

REMAINING_ASSIGNMENTS = 3
RECEIVE_DELAY = 2
RANDOM_RANGE = 10
def receive_assignment():
    time.sleep(RECEIVE_DELAY)
    assignment_result = int(random.random() * RANDOM_RANGE)
    return {
        'assignment_id': REMAINING_ASSIGNMENTS,
        'result': assignment_result
    }

def verify_assignment(assignment_result):
    if int(assignment_result) > RANDOM_RANGE/2:
        return (False, 'Number too big')
    return (True, None)

def count_open_assignments():
    return REMAINING_ASSIGNMENTS

def approve_assignment(assignment_id):
    global REMAINING_ASSIGNMENTS
    REMAINING_ASSIGNMENTS -= 1

def reject_assignment(assignment_id):
    # No action needed for purposes of the demo.
    pass

def reschedule_assignment(assignment_id):
    # No action needed for purposes of the demo.
    pass
Now kick off a new workflow execution and run the decider and the worker from the interactive interpreter step by step. Remember to export access and secret key to the environment as above. To kick off the workflow
$ ipython
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
Type "copyright", "credits" or "license" for more information.

IPython 0.10 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object'. ?object also works, ?? prints more.

In [1]: from config import DOMAIN, WF_NAME, VERSION, TASKLIST
In [2]: import boto.swf.layer2 as swf
In [3]: wf = swf.WorkflowType(domain=DOMAIN, name=WF_NAME, version=VERSION)
In [4]: wf.start(task_list=TASKLIST)
Out[4]: 
Then to have the decider make the first move:
$ python -i mturk_decider.py 
>>> decider.run()
{'previousStartedEventId': 0, 'workflowExecution': {'workflowId': 'ProcessHitGroup-1.0-1360544757', 'runId': ...}
True
>>> 
And to have the worker respond:
$ python -i mturk_worker.py 
>>> worker.run()
{'previousStartedEventId': 0, 'workflowExecution': {'workflowId': 'ProcessHitGroup-1.0-1360544757', 'runId': ...}
True
>>> 
Keep calling the run() method on both actors at the same time watching Workflow's progress in the AWS SWF console.

Saturday, February 9, 2013

The Productive Programmer

I've just finished reading Neal Ford's The Productive Programmer and it was a very good read indeed. I like timeless books, and it is fair to say that this one was very universal for the most part, maybe except for the first few chapters, which include a bunch of OS or text editor-specific productivity tricks. I didn't find them all particularly useful, but maybe it's just me. For some reason it felt like a marketing trick, by means of which the reader gets "immediate value" out of the first few pages and is pulled in for the rest of the book. It actually had an opposite effect on me. I remember thinking that if the book is like this throughout then I'd better stop reading now - but I'm glad that I persevered to uncover countless pearls of wisdom in the following chapters.

If the book isn't going to make you more productive, then at the very least you'll be able to relate to many observations pointed out by Neal.

Saturday, February 2, 2013

#matplotlib: Comparative histogram recipe

When comparing the distributions of two related data sets, it often makes sense to present the data on a comparative histogram, i.e. two histograms, one for each data set, contrasted against each other. 

Comparative histograms could be used to demonstrate a change in performance when many enough samples are available (e.g. latency of calls to a server), or compare two statistical populations, e.g. age pyramid. 
Here is the code:
"""comphist.py"""

import numpy as np
import matplotlib.pyplot as plt

def comphist(x1, x2, orientation='vertical', **kwargs):
    """Draw a comparative histogram."""
    # Split keyword args:
    kwargs1 = {}
    kwargs2 = {}
    kwcommon = {}
    for arg in kwargs:
        tgt_arg = arg[:-1]
        if arg.endswith('1'):
            arg_dict = kwargs1
        elif arg.endswith('2'):
            arg_dict = kwargs2
        else:
            arg_dict = kwcommon
            tgt_arg = arg
        arg_dict[tgt_arg] = kwargs[arg]
    kwargs1.update(kwcommon)
    kwargs2.update(kwcommon)

    fig = plt.figure()

    # Have both histograms share one axis.
    if orientation == 'vertical':
        ax1 = plt.subplot(211)
        ax2 = plt.subplot(212, sharex=ax1)
        # Flip the ax2 histogarm horizontally.
        ax2.set_ylim(ax2.get_ylim()[::-1])
        plt.setp(ax1.get_xticklabels(), visible=False)
        legend_loc = (1, 4)
    else:
        ax1 = plt.subplot(122)
        ax2 = plt.subplot(121, sharey=ax1)
        # Flip the ax2 histogarm vertically.
        ax2.set_xlim(ax2.get_xlim()[::-1])
        plt.setp(ax1.get_yticklabels(), visible=False)
        legend_loc = (1, 2)

    ax1.hist(x1, orientation=orientation, **kwargs1)
    ax2.hist(x2, orientation=orientation, **kwargs2)
    ax1.legend(loc=legend_loc[0])
    ax2.legend(loc=legend_loc[1])
    # Tighten up the layout.    
    plt.subplots_adjust(wspace=0.0, hspace=0.0)
    return fig

if __name__ == "__main__":
    comphist(np.random.randn(1000), np.random.randn(1000), 
             label1='before', label2='after', color2='green', bins=30, rwidth=1)
    plt.show()

Saturday, January 26, 2013

Refactoring by Martin Fowler #Java

Refactoring is a classic about productive work with suboptimally designed and legacy code. It's a must-read for any Java developer. The book was written by Martin Fowler with contributions from other authors.

One of the reasons Refactoring was is so popular is that it was written with the reader in mind (which can't really be said about all software books). The language is lucid and understandable. The author opens with a simple yet powerful example of refactoring. On one hand, this example whets reader's appetite by demonstrating how effective the process of refactoring can be when thought of in systematic terms. On the other, hand it familiarizes the reader with a layout convention used in the book - I thought this was a very smart trick.

Here is a word of caution: do not get an e-book. Refactoring is designed to be read in a hard-copy. It's a great read on its own, but it's also a supreme reference guide and e-books just aren't cut out for that.