MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on cluster of computers. The framework is inspired by the map and reduce functions in functional programming, although their purpose in the MapReduce framework is not the same as their original forms.
Checkout the mapreduce folder into your application directory:
svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce
Add the mapreduce handler to your app.yaml:
handlers:
- url: /mapreduce(/.*)?
script: mapreduce/main.py
Defining a Mapper
Create a function taking a single arguement. It will be called for each value returned by the input reader. An example mapper for the BlobstoreLineInputReader might look like:
Specifying readers
The mapreduce library isn''t restricted to mapping over datastore entities. It comes bundled with other input readers, defined in mapreduce/input_readers.py. Currently, this includes DatastoreInputReader (the default), BlobstoreLineInputReader, which maps over lines from one or more blobs in the blobstore, and BlobstoreZipInputReader, which maps over the contents of zip files in the blobstore.
Provided Input Readers
DataStoreInputReader
DatastoreInputReader reads all model instances of a particular kind from the datastore. It requires entity_kind class to be defined.
Parameter |
Default Value |
Explanation |
entity_kind |
None |
The datastore kind to map over. |
namespaces |
The current namespace |
The list of namespaces that will be searched for entity_kinds. |
batch_size |
50 |
The number of entities to read from the datastore with each batch get. |
DataStoreKeyInputReader
DatastoreKeyInputReader reads keys of entities of a particular kind from the datastore. It doesn''t require entity_kind class to be defined.
Parameter |
Default Value |
Explanation |
entity_kind |
None |
The datastore kind to map over. |
namespaces |
The current namespace |
The list of namespaces that will be searched for entity_kinds. |
batch_size |
50 |
The number of entities to read from the datastore with each batch get. |
BlobstoreLineInputReader
BlobstoreLineInputReader reads a delimited text file a line at the time. It calls the mapper once with each line, passing it a tuple comprised of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. In other words: (byte_offset, line_value).
Parameter |
Default Value |
Explanation |
blob_keys |
None |
Either a string containing a blob key string or an array containing multiple blob key strings. |
BlobstoreZipInputReader
BlobstoreZipInputReader iterates over all compressed files in a zipfile in Blobstore. It calls the mapper once for each file, passing it the tuple comprised of the zipfile.ZipInfo entry for the file, and a callable that returns the complete body of the file as a string. In other words: (zipinfo, file_callable).
Parameter |
Default Value |
Explanation |
blob_key |
None |
A string containing a blob key |
Running the Mapper
We can use the control api to start the mapreduce jobs from our code without any human interaction. Please add the below few lines to the CSVUploadHandler in the previous post to start the mapreduce job automatically
from mapreduce import control
name=''Create datastore from CSV ''
handler_spec=''main.CSVDataHandler''
reader_spec=''mapreduce.input_readers.BlobstoreLineInputReader''
reader_parameters={''blob_keys'':str(blob_info.key())}
logging.info(reader_parameters)
map_reduce_id=control.start_map(name,handler_spec,reader_spec,reader_parameters,shard_count=8,
mapreduce_parameters=None,
base_path="/mapreduce",
queue_name="default",
eta=None,
countdown=None,
hooks_class_name=None,
_app=None,
transactional=False)
Complete Source Code
:::app.yaml:::
application: tasks
version: 1
runtime: python
api_version: 1
handlers:
- url: /mapreduce(/.*)?
script: mapreduce/main.py
- url: /.*
script: main.py
:::main.py:::
from google.appengine.ext import webapp
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp.util import run_wsgi_app
from mapreduce import operation as op
from mapreduce import control
from models import CSV,CSVData
import logging
def CSVDataHandler(input_tuple): ##mapper function
line=input_tuple[1]
input_data=line.split('','')
csv_data=CSVData(PIN=input_data[0], Path=input_data[1], PhotoLatitude=input_data[2], PhotoLongitude=input_data[3], PhotoDirectionOfView=input_data[4], PropertyCentroidLatitude=input_data[5], PropertyCentroidLongitude=input_data[6], UTFDateTimeTaken=input_data[7],MatchQualityDescription=input_data[8])
yield op.db.Put(csv_data)
class CSVFormHandler(webapp.RequestHandler): ## form for uploading the CSV file
def get(self):
upload_url = blobstore.create_upload_url(''/upload'')
html = ''''
html += ''<html><body>''
html += "%s" % upload_url
html += ''<form action="%s" method="POST" enctype="multipart/form-data">'' % upload_url
html += """Upload File: <input type="file" name="file"><br> <input type="submit"
name="submit" value="Submit"> </form></body></html>"""
self.response.out.write(html)
class CSVUploadHandler(blobstore_handlers.BlobstoreUploadHandler): ## Processing the uploaded file
def post(self):
try:
upload_files = self.get_uploads(''file'')
logging.info(''File upload received %s. File count=%d'' % (upload_files[0].filename, len(upload_files)))
if len(upload_files) > 0:
blob_info = upload_files[0]
csv = CSV(blob_key=blob_info.key(), blob_name=blob_info.filename, blob_created=blob_info.creation, blob_size=blob_info.size);
logging.info("csv created")
csv_key = db.put(csv)
logging.info(''Blob stored key=%s csv_key=%s'' % (blob_info.key(), csv_key))
name=''Create datastore from CSV ''
handler_spec=''main.CSVDataHandler''
reader_spec=''mapreduce.input_readers.BlobstoreLineInputReader''
reader_parameters={''blob_keys'':str(blob_info.key())}
logging.info(reader_parameters)
map_reduce_id=control.start_map(name,handler_spec,reader_spec,reader_parameters,shard_count=8,
mapreduce_parameters=None,
base_path="/mapreduce",
queue_name="default",
eta=None,
countdown=None,
hooks_class_name=None,
_app=None,
transactional=False)
self.redirect(''/new'')
except:
logging.error(''Error in prosessing the file'')
self.response.out.write(''Error in prosessing the file'')
application = webapp.WSGIApplication([(''/new'',CSVFormHandler),
(''/upload'',CSVUploadHandler)
],debug=True)
def main():
run_wsgi_app(application)
if __name__ == ''__main__'':
main()
:::models.py:::
from google.appengine.ext import db
from google.appengine.ext import blobstore
class CSV(db.Model):
blob_key=blobstore.BlobReferenceProperty(required=True)
blob_name=db.StringProperty(required=True)
blob_created=db.DateTimeProperty()
blob_size=db.IntegerProperty()
class CSVData(db.Model):
PIN=db.StringProperty()
Path=db.StringProperty()
PhotoLatitude=db.StringProperty()
PhotoLongitude=db.StringProperty()
PhotoDirectionOfView=db.StringProperty()
PropertyCentroidLatitude=db.StringProperty()
PropertyCentroidLongitude=db.StringProperty()
UTFDateTimeTaken=db.StringProperty()
MatchQualityDescription=db.StringProperty()