• http://www.techneurons.com/career/
  • experienced programming consultants for hire !!!

    Contact Now

    Google AppEngine Articles

    KB: Adding the MapReduce library to your application (Python code)

    TUESDAY, APRIL 19, 2011

    MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on cluster of computers. The framework is inspired by the map and reduce functions in functional programming, although their purpose in the MapReduce framework is not the same as their original forms.

    Checkout the mapreduce folder into your application directory:

    svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

    Add the mapreduce handler to your app.yaml:

    handlers:
    - url: /mapreduce(/.*)?
      script: mapreduce/main.py
    

    Defining a Mapper

    Create a function taking a single arguement. It will be called for each value returned by the input reader. An example mapper for the BlobstoreLineInputReader might look like:

    Specifying readers

    The mapreduce library isn''t restricted to mapping over datastore entities. It comes bundled with other input readers, defined in mapreduce/input_readers.py. Currently, this includes DatastoreInputReader (the default), BlobstoreLineInputReader, which maps over lines from one or more blobs in the blobstore, and BlobstoreZipInputReader, which maps over the contents of zip files in the blobstore.

    Provided Input Readers

    DataStoreInputReader

    DatastoreInputReader reads all model instances of a particular kind from the datastore. It requires entity_kind class to be defined.

    Parameter Default Value Explanation
    entity_kind None The datastore kind to map over.
    namespaces The current namespace The list of namespaces that will be searched for entity_kinds.
    batch_size 50 The number of entities to read from the datastore with each batch get.

    DataStoreKeyInputReader

    DatastoreKeyInputReader reads keys of entities of a particular kind from the datastore. It doesn''t require entity_kind class to be defined.

    Parameter Default Value Explanation
    entity_kind None The datastore kind to map over.
    namespaces The current namespace The list of namespaces that will be searched for entity_kinds.
    batch_size 50 The number of entities to read from the datastore with each batch get.

    BlobstoreLineInputReader

    BlobstoreLineInputReader reads a delimited text file a line at the time. It calls the mapper once with each line, passing it a tuple comprised of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. In other words: (byte_offset, line_value).

    Parameter Default Value Explanation
    blob_keys None Either a string containing a blob key string or an array containing multiple blob key strings.

    BlobstoreZipInputReader

    BlobstoreZipInputReader iterates over all compressed files in a zipfile in Blobstore. It calls the mapper once for each file, passing it the tuple comprised of the zipfile.ZipInfo entry for the file, and a callable that returns the complete body of the file as a string. In other words: (zipinfo, file_callable).

    Parameter Default Value Explanation
    blob_key None A string containing a blob key

    Running the Mapper

    We can use the control api to start the mapreduce jobs from our code without any human interaction. Please add the below few lines to the CSVUploadHandler in the previous post to start the mapreduce job automatically

    from mapreduce import control
    	name=''Create datastore from CSV ''
    	handler_spec=''main.CSVDataHandler''
        reader_spec=''mapreduce.input_readers.BlobstoreLineInputReader''
        reader_parameters={''blob_keys'':str(blob_info.key())}
        logging.info(reader_parameters)
        map_reduce_id=control.start_map(name,handler_spec,reader_spec,reader_parameters,shard_count=8,         		
                  	mapreduce_parameters=None, 
                  	base_path="/mapreduce",
                  	queue_name="default",
                  	eta=None,
                  	countdown=None,
                 	hooks_class_name=None,
                  	_app=None,
                  	transactional=False)
    

    Complete Source Code

    :::app.yaml:::
    application: tasks
    version: 1
    runtime: python
    api_version: 1
    
    handlers:
    - url: /mapreduce(/.*)?
      script: mapreduce/main.py
    - url: /.*
      script: main.py
      
      
    :::main.py:::
    from google.appengine.ext import webapp
    from google.appengine.ext import blobstore
    from google.appengine.ext import db
    from google.appengine.ext.webapp import blobstore_handlers
    from google.appengine.ext.webapp.util import run_wsgi_app
    from mapreduce import operation as op
    from mapreduce import control
    from models import CSV,CSVData
    import logging
    
    def CSVDataHandler(input_tuple): ##mapper function
    	line=input_tuple[1]
    	input_data=line.split('','')
    	csv_data=CSVData(PIN=input_data[0], Path=input_data[1], PhotoLatitude=input_data[2], PhotoLongitude=input_data[3], PhotoDirectionOfView=input_data[4], PropertyCentroidLatitude=input_data[5], PropertyCentroidLongitude=input_data[6], UTFDateTimeTaken=input_data[7],MatchQualityDescription=input_data[8])
    	yield op.db.Put(csv_data)
    	
    class CSVFormHandler(webapp.RequestHandler): ## form for uploading the CSV file
    	def get(self):
            	upload_url = blobstore.create_upload_url(''/upload'')
            	html = ''''
            	html += ''<html><body>''
    		html += "%s" % upload_url
            	html += ''<form action="%s" method="POST" enctype="multipart/form-data">'' % upload_url
            	html += """Upload File: <input type="file" name="file"><br> <input type="submit"
                name="submit" value="Submit"> </form></body></html>"""
            	self.response.out.write(html)
    
    
    class CSVUploadHandler(blobstore_handlers.BlobstoreUploadHandler): ## Processing the uploaded file
    	def post(self):
            	try:
                		upload_files = self.get_uploads(''file'')
                		logging.info(''File upload received %s.  File count=%d'' % (upload_files[0].filename, len(upload_files)))
                		if len(upload_files) > 0:
                    	blob_info = upload_files[0]
    					csv = CSV(blob_key=blob_info.key(), blob_name=blob_info.filename, blob_created=blob_info.creation, blob_size=blob_info.size);
    					logging.info("csv created")
    					csv_key = db.put(csv)
    	         		logging.info(''Blob stored key=%s  csv_key=%s'' % (blob_info.key(), csv_key))
    					name=''Create datastore from CSV ''
    					handler_spec=''main.CSVDataHandler''
    					reader_spec=''mapreduce.input_readers.BlobstoreLineInputReader''
    					reader_parameters={''blob_keys'':str(blob_info.key())}
    					logging.info(reader_parameters)
    					map_reduce_id=control.start_map(name,handler_spec,reader_spec,reader_parameters,shard_count=8,         		
    									mapreduce_parameters=None, 
    									base_path="/mapreduce",
    									queue_name="default",
    									eta=None,
    									countdown=None,
    									hooks_class_name=None,
    									_app=None,
    									transactional=False)
                   		self.redirect(''/new'')
    
    		except:
    			logging.error(''Error in prosessing the file'')
    			self.response.out.write(''Error in prosessing the file'')
    
    
    application = webapp.WSGIApplication([(''/new'',CSVFormHandler),
    				      (''/upload'',CSVUploadHandler)
    				     ],debug=True)
    def main():
    	run_wsgi_app(application)
    if __name__ == ''__main__'':
    	main()
    
    :::models.py:::
    from google.appengine.ext import db
    from google.appengine.ext import blobstore
    
    class CSV(db.Model):
    	blob_key=blobstore.BlobReferenceProperty(required=True)
    	blob_name=db.StringProperty(required=True)
    	blob_created=db.DateTimeProperty()
    	blob_size=db.IntegerProperty()
    	
    class CSVData(db.Model):
    	PIN=db.StringProperty()
    	Path=db.StringProperty()
    	PhotoLatitude=db.StringProperty()
    	PhotoLongitude=db.StringProperty()
    	PhotoDirectionOfView=db.StringProperty()
    	PropertyCentroidLatitude=db.StringProperty()
    	PropertyCentroidLongitude=db.StringProperty()
    	UTFDateTimeTaken=db.StringProperty()
    	MatchQualityDescription=db.StringProperty()
    

    ConsultSarath - We provide end to end outsourcing solutions for .net programming requirements- you can hire programmer for hourly rates, for monthly commitments, for short term projects, for long term projects, Contact to know our hourly rates for programmer in India. ConsultSarath - We provide end to end outsourcing solutions for php programming requirements- you can hire programmer for hourly rates, for monthly commitments, for short term projects, for long term projects, Contact to know our hourly rates for programmer in India. ConsultSarath - We provide end to end outsourcing solutions for python programming requirements- you can hire programmer for hourly rates, for monthly commitments, for short term projects, for long term projects, Contact to know our hourly rates for programmer in India.

    Other Popular Articles
    We are experts in Cloud Computing Technologies. We can assist you to build high scalable business applications using Amazon Web Services (Amazon EC2, Amazon S3, Amazon SES, SNS, CloudFront), Windows Azure Platforms - Windows Azure and SQL Server Azure, Google App Engine using Python and Django Framework. We are Expert Programming Consultants available at affordable rates per hour. We work on several technologies - .NET, Python, Google App Engine, PHP, Windows Azure, Amazon Web Services ...