Sunday, February 10, 2013

Simple Workflow example with boto.swf: the #MTurk Requester

In this post I'm going to demonstrate the application of Amazon Simple Workflow using Eric's suggestion from the original boto.swf post as an example.

As a resident of Europe, I'm unable to use MTurk as requester, my assumptions about MTurk Requester and its operation come from code in boto.mturk. I'm hoping that the workflow presented below is simple and generic enough to be easily tweaked as necessary.

The scenario goes as follows: The requester publishes a HIT group and waits for solutions from individual human workers to start coming in. As the solutions arrive, they're subjected to a number of validity checks (such as the rules to go by as listed in the HIT description). If the checks pass, the requester wants to accept the hit and reward the human worker. Otherwise, the assignment is marked as rejected and may resurface on the MTurk website for other workers.  When all assignments are completed, the HIT group may be closed and the workflow finishes.

So first, let's model this scenario as a workflow, starting with discerning activities. I see five activities here, with [nicknames] in square brackets:
  • Receive a completed assignment from a worker [ReceiveAssignment],
  • verify its correctness [VerifyAssignment],
  • approve when correct [ApproveAssignment],
  • reject otherwise [RejectAssignment],
  • see how many are left to do [CountOpenAssignments].
Now, so let's think about a control flow to build the business logic out of these blocks.
  • poll for a completed assignment and on arrival of a solution
  • validate it
    • if the solution is valid,
      • approve and reward the worker,
      • count remaining open tasks
        • if none are left, then close the workflow.
    • if, on the other hand, the checks fail, 
      • reject solution and
      • post the assignment once again,
  • poll for the next assignment (goto start)

Before we start, we'll need to register workflow and activities with Amazon Simple Workflow Service. It's a one-off action and can be done from an interactive interpreter.
$ export AWS_ACCESS_KEY_ID=access_key
$ export AWS_SECRET_ACCESS_KEY=secret_key
$ ipython
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
Type "copyright", "credits" or "license" for more information.

IPython 0.10 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object'. ?object also works, ?? prints more.

In [1]: import boto.swf.layer2 as swf
In [2]: DOMAIN='mturk_requester'
In [3]: WF_NAME = 'ProcessHitGroup' 
In [4]: TASKLIST = WF_NAME + 'Tasks' 
In [5]: VERSION = '1.0'
In [6]: mturk_domain = swf.Domain(name=DOMAIN)
In [7]: mturk_domain.register()
In [8]: workflow = swf.WorkflowType(domain=DOMAIN, name=WF_NAME, version=VERSION, 
...                                 task_list=TASKLIST)

In [9]: workflow.register()
In [10]: for activity_name in ('ReceiveAssignment', 'VerifyAssignment', 'AcceptAssignment',
...                            'RejectAssignment', 'CountOpenAssignments'):
...    swf.ActivityType(domain=DOMAIN, name=activity_name, version=VERSION,
...                     task_list=TASKLIST).register()
 
# To verify successful registration:

In [11]: mturk_domain.workflows()
Out[11]: [<WorkflowType 'ProcessHitGroup-1.0' at 0x1e31550>]
In [12]: mturk_domain.activities()
Out[12]: 
[<ActivityType 'ApproveAssignment-1.0' at 0x1e31950>,
 <ActivityType 'CountOpenAssignments-1.0' at 0x1e31a90>,
 <ActivityType 'ReceiveAssignment-1.0' at 0x1e31c10>,
 <ActivityType 'RejectAssignment-1.0' at 0x1e31dd0>,
 <ActivityType 'VerifyAssignment-1.0' at 0x1e31f90>]

In order to minimize data duplication, I recommend creating a config.py file and populating it with constants.

# config.py
DOMAIN = 'mturk'
WF_NAME = 'ProcessHitGroup'
TASKLIST = WF_NAME + 'Tasks'
VERSION = '1.0'

The Worker
You can run a dedicated activity worker for each type of activity, or you can implement a worker capable of responding to a number of activities and run a few such workers in parallel. Workers capable of responding to different types of tasks might not be viable for all kinds of workflows, but they might increase robustness: some of your workers may crash and never recover. If a group of workers specialized for one particular activity dies out, the workflow may hang, whereas a single surviving poly-task worker will still plough along through the workflow and will bring a workflow to a close successfully.
import activity_impl
import boto.swf.layer2 as swf
import json
from config import *

class MturkRequesterActivityWorker(swf.ActivityWorker):
    
    domain = DOMAIN
    version = VERSION
    task_list = TASKLIST

    def run(self):
        activity_task = self.poll()
        # poll() returns a dictionary. When no tasks are enqueued in SWF,
        # the connection is kept open for a minute (long poll) and the service
        # comes back with {...}. When an actual task is received, the dictionary 
        # contains 'activityId' key.
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            activity = {
                'ReceiveAssignment': self._receive_assignment,
                'VerifyAssignment': self._verify_assignment,
                'CountOpenAssignments': self._count_open_assignments,
                'RejectAssignment': self._reject_assignment,
                'ApproveAssignment': self._approve_assignment
            }.get(activity_task['activityType']['name'])

            # If anything happens, report the failure and its reason.
            try:
                activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def _receive_assignment(self, _):
        # Input into this activity is disregarded.
        assignment = activity_impl.receive_assignment()
        self.complete(result=json.dumps(assignment))

    def _verify_assignment(self, assignment_contents):
        # It is assumed that verification may succeed or fail in which case
        # reason for a failure may be provided.
        assignment = json.loads(assignment_contents)
        acceptable, reason = activity_impl.verify_assignment(assignment['result'])
        self.complete(result=json.dumps({
            'acceptable': acceptable,
            'reason': reason,
            'assignment_id' : assignment['assignment_id']
        }))

    def _count_open_assignments(self, _):
        # Return assignment count. 
        self.complete(result=str(activity_impl.count_open_assignments()))

    def _reject_assignment(self, assignment_id):
        activity_impl.reject_assignment(assignment_id)
        # Optionally:
        activity_impl.reschedule_assignment(assignment_id)
        self.complete()

    def _approve_assignment(self, assignment_id):
        activity_impl.approve_assignment(assignment_id)
        self.complete()

worker = MturkRequesterActivityWorker()

The Decider
The thing to remember when implementing a decider is that a decider is meant to embody the business logic and logic alone. In particular, the only external web service a decider should be calling is AWS Simple Workflow. If a decider needs additional information to make a decision, which requires some simple yet non-local  HTTP GET call, it is advised to resist the urge of implementing the calling code in the decider and instead delegate the operation to an activity worker in some minor task.
import time
import json
from config import *
import boto.swf.layer2 as swf

class MturkRequesterDecider(swf.Decider):

    domain = DOMAIN
    task_list = TASKLIST
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            decisions = swf.Layer1Decisions()
            # Decision* events are irrelevant here and can be ignored.
            workflow_events = [e for e in history['events'] 
                               if not e['eventType'].startswith('Decision')]
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('ra-%i' % time.time(),
                                                 'ReceiveAssignment',
                                                 VERSION, task_list=TASKLIST)
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1. Get the name of the completed activity.
                # 1a. What is the event id of the completed activity? 
                completed_activity_id = last_event['activityTaskCompletedEventAttributes']['scheduledEventId'] - 1
                # 1b. What is the name of the completed activity?
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 1c. Get the result from the activity, if any at all.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')
                # Take the decision.
                if activity_name == 'ReceiveAssignment':
                    # Completed Assignment just came in. Verify it.
                    decisions.schedule_activity_task('va-%i' % time.time(),
                                                     'VerifyAssignment',
                                                     VERSION, task_list=TASKLIST,
                                                     input=result)
                elif activity_name == 'VerifyAssignment':
                    output = json.loads(result)
                    if output['acceptable']:
                        decisions.schedule_activity_task('aa-%i' % time.time(),
                            'ApproveAssignment', VERSION, task_list=TASKLIST,
                            input=str(output['assignment_id']))
                    else:
                        decisions.schedule_activity_task('da-%i' % time.time(),
                            'RejectAssignment', VERSION, task_list=TASKLIST,
                            input=str(output['assignment_id']))
                elif activity_name == 'ApproveAssignment':
                    # Close workflow if there are no more activity tasks running.
                    decisions.schedule_activity_task('ca-%i' % time.time(),
                         'CountOpenAssignments', VERSION, task_list=TASKLIST)
                elif activity_name == 'CountOpenAssignments' and int(result) == 0:
                        decisions.complete_workflow_execution()
                else:
                    decisions.schedule_activity_task('ra-%i' % time.time(),
                        'ReceiveAssignment', VERSION, task_list=TASKLIST)

            self.complete(decisions=decisions)
            return True

decider = MturkRequesterDecider()

Mock Implementation
It should be possible to tweak the following mock implementation to have this code actually talk to the MTurk and SQS. Right now it's just a proof of concept for purposes of toying with Amazon SWF.

#activity_impl.py
import time
import random

REMAINING_ASSIGNMENTS = 3
RECEIVE_DELAY = 2
RANDOM_RANGE = 10
def receive_assignment():
    time.sleep(RECEIVE_DELAY)
    assignment_result = int(random.random() * RANDOM_RANGE)
    return {
        'assignment_id': REMAINING_ASSIGNMENTS,
        'result': assignment_result
    }

def verify_assignment(assignment_result):
    if int(assignment_result) > RANDOM_RANGE/2:
        return (False, 'Number too big')
    return (True, None)

def count_open_assignments():
    return REMAINING_ASSIGNMENTS

def approve_assignment(assignment_id):
    global REMAINING_ASSIGNMENTS
    REMAINING_ASSIGNMENTS -= 1

def reject_assignment(assignment_id):
    # No action needed for purposes of the demo.
    pass

def reschedule_assignment(assignment_id):
    # No action needed for purposes of the demo.
    pass
Now kick off a new workflow execution and run the decider and the worker from the interactive interpreter step by step. Remember to export access and secret key to the environment as above. To kick off the workflow
$ ipython
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
Type "copyright", "credits" or "license" for more information.

IPython 0.10 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object'. ?object also works, ?? prints more.

In [1]: from config import DOMAIN, WF_NAME, VERSION, TASKLIST
In [2]: import boto.swf.layer2 as swf
In [3]: wf = swf.WorkflowType(domain=DOMAIN, name=WF_NAME, version=VERSION)
In [4]: wf.start(task_list=TASKLIST)
Out[4]: 
Then to have the decider make the first move:
$ python -i mturk_decider.py 
>>> decider.run()
{'previousStartedEventId': 0, 'workflowExecution': {'workflowId': 'ProcessHitGroup-1.0-1360544757', 'runId': ...}
True
>>> 
And to have the worker respond:
$ python -i mturk_worker.py 
>>> worker.run()
{'previousStartedEventId': 0, 'workflowExecution': {'workflowId': 'ProcessHitGroup-1.0-1360544757', 'runId': ...}
True
>>> 
Keep calling the run() method on both actors at the same time watching Workflow's progress in the AWS SWF console.

6 comments:

  1. Thanks for the effort, but this blog is incredibly hard to read.

    I've just spent 10 minutes trying to find a chrome extension to disable styles so I can actually read it. No luck so far. I keep saying "screw it, I'll just read it as-is". Then I try, then I go back to searching for a way to disable styles.

    ReplyDelete
  2. @Nate thanks a lot for feedback. I'll try to do something about it.

    ReplyDelete
  3. How to fetch input passed in workflow inside decider?

    I want to pass some data to workflow to worker, can someone help me please?

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. Mobdro app is one of the best free apps available right now for streaming videos on your smartphones. If you are tired to buffering videos due to low internet speed, then you can use this app and forget about buffering once and for all. To know more about how to install it on your android device, windows phone, pc, blackberry, just take a look here
    Mobdro for PC

    ReplyDelete