Saturday, January 19, 2013

boto.swf - #Amazon Simple Workflow in #Python

boto is a popular Python interface to Amazon Web Services. The package was originally written by Mitch Garnaat, the author of Python and AWS Cookbook. With version 2.7.0 that just came out on 2013-01-10, boto also includes support for Amazon Simple Workflow (SWF) by providing object-oriented interface to Amazon SWF. boto.swf.layer2 is my humble contribution.

SWF is an exciting service that makes coordinating of distributed systems fun. As a cloud service, it takes away the tedium of synchronization and distributed locking and lets you focus on the business logic. In essence, SWF will help you orchestrate your distributed applications.

Here is how I like to think about the service, in a nutshell:
  1. A workflow is a distributed program.
    It encodes a business logic, i.e. a set of instructions and decisions. Decisions are taken based on outcomes from actions. Two main types of actors implement the business logic of a workflow: workers, whose job it is to take inputs, carry out tasks and return a result and deciders responsible for orchestration of the workflow, i.e. for taking decision based on what workers come back with. A minimal useful workflow must implement one decider and one worker.
  2. Workflows are meant to run from multiple machines. Workflows are distributed in nature, with which comes an advantage of improved reliability through redundancy. A workflow running from a fleet of hosts (multiple deciders talking to multiple workers) eliminates the single point of failure that non-distributed applications running from a single machine are prone to. The time of execution may vary based on number of computers involved, but a workflow's outcome should always be the same irrespective of the number and distribution of workers.
  3. All actors are independent.
    Actors are independent from and unaware of each other. Both deciders and activity workers poll the service for tasks and when a tasks arrives,  the actor de-queues it from the service and starts cracking on it. All timing information is recorded and managed by the service, which may cancel a task or terminate a workflow based on pre-defined timeout.
And here is how simple creating a workflow with boto.swf can be. Consider the following block of code that bootstraps a test domain (via setup_swf() call) and implements two kinds of business logic: one for parallel and one for serial task execution.

Note: This workflow will behave the same regardless of how many actors you launch, and irrespective of what physical location your activity workers and deciders run from. 

The only prerequisite is Internet access for talking to Amazon SWF service.

#!/usr/bin/python

import boto.swf.layer2 as swf
import time

ACCESS = your_access_key
SECRET = your_secret_key
DOMAIN = 'botoTest'
WORKFLOW = DOMAIN + 'Workflow'
TASKLIST = DOMAIN + 'TaskList'
ACTIVITY = DOMAIN + 'Activity'
VERSION = 'v1'

def setup_swf():
    swf.set_default_credentials(ACCESS, SECRET) 
    try:
        print 'Registering domain', DOMAIN
        swf.Domain(name=DOMAIN).register()
    except Exception, e:
        print e
    try:
        print 'Registering activity', ACTIVITY
        swf.ActivityType(domain=DOMAIN, name=ACTIVITY, version=VERSION,
                         task_list=TASKLIST).register()
    except Exception, e:
        print e
    try:
        print 'Registering workflow', WORKFLOW
        swf.WorkflowType(domain=DOMAIN, name=WORKFLOW, version=VERSION,
                         task_list=TASKLIST).register()
    except Exception, e:
        print e

class TestDecider(swf.Decider):

    SCHED_COUNT = 5

    def run_parallel(self):
        decision_task = self.poll()
        if 'events' in decision_task:
            decisions = swf.Layer1Decisions()
            # Decision* events are irrelevant here and can be ignored.
            workflow_events = [e for e in decision_task['events'] 
                               if not e['eventType'].startswith('Decision')]
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At start, kickoff SCHED_COUNT activities in parallel.
                for i in range(TestDecider.SCHED_COUNT):
                    decisions.schedule_activity_task('activity%i' % i, ACTIVITY, VERSION, 
                                                     task_list=TASKLIST)
            elif last_event_type == 'ActivityTaskCompleted':
                # Monitor progress. When all activities complete, complete workflow.
                completed_count = sum([1 for a in decision_task['events']
                                       if a['eventType'] == 'ActivityTaskCompleted'])
                print '%i/%i' % (completed_count, TestDecider.SCHED_COUNT)
                if completed_count == TestDecider.SCHED_COUNT:
                    decisions.complete_workflow_execution()
            self.complete(decisions=decisions)
            return True

    def run_serial(self):
        decision_task = self.poll()
        if 'events' in decision_task:
            decisions = swf.Layer1Decisions()
            completed_count = sum([1 for a in decision_task['events']
                                   if a['eventType'] == 'ActivityTaskCompleted'])
            print '%i/%i' % (completed_count, TestDecider.SCHED_COUNT)
            if completed_count == TestDecider.SCHED_COUNT:
                decisions.complete_workflow_execution()
            else:
                decisions.schedule_activity_task('activity%i' % completed_count, 
                                                 ACTIVITY, VERSION, 
                                                 task_list=TASKLIST)
            self.complete(decisions=decisions)
            return True

class TestWorker(swf.ActivityWorker):

    def run(self):
        """Report current time."""
        activity_task = self.poll()
        if 'activityId' in activity_task:
            self.complete(result=str(time.time()))
            return True

Copy and paste the code above into a `boto_swf_test.py' and execute it in an interactive interpreter to a) bootstrap the domain, b) kickoff a workflow execution and c) launch actors. 


$ python -i boto_swf_test.py
>>> setup_swf()
Registering domain botoTest
Registering activity botoTestActivity
Registering workflow botoTestWorkflow
>>> wkf = swf.WorkflowType(domain=DOMAIN, name=WORKFLOW, version=VERSION, task_list=TASKLIST)
>>> wkf.start() # Kicks off workflow execution

>>> wrk = TestWorker(domain=DOMAIN, task_list=TASKLIST)
>>> while wrk.run(): pass  # Launches an activity worker

Now you'll need at least one more python interpreter to run the decider.

$ python -i boto_swf_test.py
>>> swf.set_default_credentials(ACCESS, SECRET)
>>> dec = TestDecider(domain=DOMAIN, task_list=TASKLIST)

and you're ready to run the workflow in parallel
>>> while dec.run_parallel(): pass # Runs activities in parallel.

or serially
>>> while dec.run_serial(): pass

In both cases the decider will print progress and hang on for 1 minute (long poll) after the last decision before it closes the connection to SWF.

The examples above illustrate the serial and parallel orchestration of tasks, but are nothing more than that. If you can think of a good example of a workflow that you'd like to see implemented, just call it out in the comments below and I'll do my best to implement a few from your suggestions.

4 comments:

  1. Thank you for this post. This is very informative. I would like to see an example of Amazon SWF using Amazon Mechanical Turk to classify image. I am still putting the pieces together, but I am unclear how to let SWF know that a HIT is completed. Thanks again!

    ReplyDelete
  2. Hi Eric! An MTurk job can be modeled as a parallel SWF workflow: a number of tasks are dispatched to individual workers in parallel, then tasks may either time out (at which point the decider reschedules them) or be completed (decider marks progress). If all tasks get completed within workflow's duration limit then the workflow completes as successful. Otherwise, a workflow-level timeout takes place and the job is closed as partially done.

    With that in mind, the MTurk service manages all that for you - the service is the "decider" and worker tasks are delegated to people through a web browser. In other words, MTurk takes away from the necessity to create and manage workflows on your own.

    Let me know if I'm missing something and I'll be happy to publish some code snippets in exchange.

    ReplyDelete
  3. Hi Stawek,

    Thank you for the prompt response. I envisioning something along the line of use case #2 at http://aws.amazon.com/swf/faqs/#some_use_cases. Further more, MTurk might get call upon multiple time (e.g. HIT#1: Does the image contains humain faces?(Y/N); if Yes, then HIT#2: Identify location of faces (to be pixelated)). My understanding is that I will need to create a decider that will ping MTurk to check if the work is completed and report to SWF; unless there is a more elegant way to accomplish this.

    Thank for your help,

    ReplyDelete
  4. I see; you're looking for a workflow that interacts with MTurk on the requester's side (approve/reject/reschedule/resubmit) as the hits are being processed. Notably, getting the decider to know when is the time to close a HIT group. Cool, it will make for an interesting example of a workflow. I'll take a couple of days to put it together, juggling other commitments. I'll keep it simple and focus on the workflow part and not on MTurk interaction.

    ReplyDelete