Using Python and Common-Crawl to find products from Amazon.com

Common-Crawl (CC) is an Awesome free and open source collection of crawled data from the world wide web spanning back many years. Common-Crawl does what Google and Bing does but allows for anyone to access their information and analysis the information for free and to use the data commercially for free. The data sets recorded are now tipping many petabytes and are stored on AWS S3 for free courteous of Amazon.

Today we will be investing this available information by using Python and a couple plugins to analyze the stored raw HTML code.
The data stored is in a compressed format due to many pure HTML pages, hence finding specific pages can be difficult. Common-Crawl have provided a useful API (http://index.commoncrawl.org/) which Apps may access and use to find all pages with a specific domain name. When visiting the API page, you may notice how there is a long list of entries which start with "/CC-MAIN-...". This long list is the snapshots or datasets that Common-Crawl find each month, this means that you may actually go back many months or years to find and extract information. For this example, we will use the latest dataset CC-MAIN-2017-39-index.

Below is the API call we will use in our application. The first %s is the data set number so eg. "2017-39" and the second is the %s is the desired domain to search.

http://index.commoncrawl.org/CC-MAIN-%s-index?url=%s&matchType=domain&output=json


Pseudocode Break Down

  1. Search domain - Common-crawl API

  2. Add URLs to list

  3. Start saving thread

  4. Slip up URLs into two threads

  5. Loop through list of URLs, Download page and confirm if product. Full HTML Archive copy from Common-crawl

  6. If Page is a product, extract details and create new product Object

  7. Add object to save buffer, which adds it to DynamoDB

  8. Loop until list completed


### -----------------------
### Searches the Common Crawl Index for a domain.
### -----------------------
Searches the Common Crawl Index for a domain.
def search_domain(domain):
    record_list = []
    print "[*] Trying target domain: %s" % domain
    for index in index_list:
        print "[*] Trying index %s" % index
        cc_url  = "http://index.commoncrawl.org/CC-MAIN-%s-index?" % index
        cc_url += "url=%s&matchType=domain&output=json" % domain
        response = requests.get(cc_url)
        if response.status_code == 200:
            records = response.content.splitlines()
            for record in records:
                record_list.append(json.loads(record))  
            print "[*] Added %d results." % len(records)
    print "[*] Found a total of %d hits." % len(record_list)
    return record_list

The python code accepts a domain name to search and returns all the URLs which belong to the domain name from the common crawl dataset. Once the URLs have been downloaded and stored in a list, we can move on to downloading the compressed page and performing the analysis.

#---------------------------
# Downloads full page
#---------------------------
def download_page(record):

    offset, length = int(record['offset']), int(record['length'])
    offset_end = offset + length - 1

    # We'll get the file via HTTPS so we don't need to worry about S3 credentials
    # Getting the file on S3 is equivalent however - you can request a Range
    prefix = 'https://commoncrawl.s3.amazonaws.com/'

    # We can then use the Range header to ask for just this set of bytes
    resp = requests.get(prefix + record['filename'], headers={'Range': 'bytes={}-{}'.format(offset, offset_end)})

    # The page is stored compressed (gzip) to save space
    # We can extract it using the GZIP library
    raw_data = StringIO.StringIO(resp.content)
    f = gzip.GzipFile(fileobj=raw_data)

    # What we have now is just the WARC response, formatted:
    data = f.read()

    response = ""

    if len(data):
        try:
            warc, header, response = data.strip().split('\r\n\r\n', 2)
        except:
            pass

    return response

The python code above uses the inbuilt python library "requests" to download a compressed page from the dataset saved on Amazons AWS S3. The downloaded page is then extracted using the "Gzip" library and the raw HTML data is returned as the response. Once the page is downloaded we can use our custom function with the help of the python plugin "Beautifulsoup" to find specific data residing in the HTML code.

#---------------------
# Extract Product from the single HTML page.   
#----------------------
def extract_product(html_content, url):
    #String Buffer
    string_buffer = "" 
    errs = list()

    #Read page and read to extract product infomation
    parser = BeautifulSoup(html_content, "html.parser")  

    #Check if the page is a product, if not skip page.
    truth, asin = check_page(parser)
    if not truth:
        errs.append("Not product")
        return (False, errs)

    #New Product as a object
    product = Product()
    #New Keyword rank
    keyword = Rake(SmartStopList.words())


    #Find URL
    product.SetUrl(url)

    #Find Brand: Note: Some products have an image for the brand 
    truth, string_buffer = search_table(parser, {"id": "productDetails_techSpec_section_1"}, "Brand Name")
    if truth:
        product.SetBrand(string_buffer)
    else:
        string_buffer = parser.find("a", attrs={"id": "brand"})
        if string_buffer != None:
            product.SetBrand(string_buffer.get_text().strip())
        else:
            errs.append("Could not find Brand")

    #Find Title
    string_buffer = parser.find("span", attrs={"id": "productTitle"})
    if string_buffer != None:
        product.SetTitle(string_buffer.get_text().strip())
    else:
        errs.append("Could not find Title")
        return (False, errs) 

    #Find Image
    string_buffer = parser.find("img", attrs={"id": "landingImage"})
    if string_buffer != None:
        string_buffer = string_buffer.get("data-old-hires")
        if len(string_buffer) < 2:
            string_buffer = parser.find("img", attrs={"id": "landingImage"}).get("data-a-dynamic-image")
            m = re.search('https://(.+?).jpg', string_buffer)
            if m:
                string_buffer = m.group(1)
                string_buffer = "https://{}.jpg".format(string_buffer)
        #print ("Img Url: "+string_buffer)
        product.SetImage(string_buffer)
    else:
        errs.append("Could not find Image")

    #Find Small Blob
    #TODO: Need to perform keyword analysis
    string_buffer = parser.find("div", attrs={"id": "feature-bullets"})
    if string_buffer != None:
        string_buffer = string_buffer.find("ul")
    try:        
        string_buffer = string_buffer.find_all("li")
        if string_buffer != None:
            string_buffer_2 = ""
            for span in string_buffer:
                string_buffer_3 = span.find("span")
                if string_buffer_3 != None:
                    string_buffer_3 = string_buffer_3.get_text()
                    try:
                        string_buffer_2 = "{} {}".format(string_buffer_2, string_buffer_3.strip())
                    except: 
                        pass
            saved_buffer = string_buffer_2.strip()
            #Calculating Key Words
            keywords_1 = keyword.run(saved_buffer)
            product.SetSmallBlog(keywords_1)
    except:	
        errs.append("Error finding li")

    else:
        errs.append("Could not find small section keywords")


    #Find Large Blob
    #TODO: Need to perform keyword analysis
    string_buffer = parser.find("div", attrs={"id": "productDescription"})
    if string_buffer != None:
        string_buffer = string_buffer.find("p")
    if string_buffer != None:	
        string_buffer = string_buffer.get_text()
        saved_buffer = string_buffer.strip()
        #Calculating Key Words
        keywords_2 = keyword.run(saved_buffer)
        product.SetLargeBlob(keywords_2)
    else:
        errs.append("Could not find large section keywords")

    #Find ASIN
    product.SetSourceID(asin)

    #TODO: Perform price save!

    #Append the product to large list of products
    if product.FormCompleted():
        return (product, errs)
    else:
        return (False, errs)

Now, there is a bit to explain here.

The function above accepts two inputs, the HTML code and the URL for the page. The page first initialises the BeautifulSoup library to a variable called parser. The function will then check if the page that it is currently inspecting is definitely a products page, as the common crawl API will return pages and URLs which are mixed with menus, corporate info, deals pages, etc. This is done using the function below.

#-------------------
# Perform Precheck to see if page is a product
#-------------------
def check_page(parsed):
    parser = parsed

    #First Check of ASIN
    found, asin = search_table(parser, {"id": "productDetails_detailBullets_sections1"}, "ASIN")
    if found:
        return (True, asin)

    #Second Check of ASIN
    check_asin_2 = parser.find("b", text="ASIN:")    
    check_asin_3 = parser.find("b", text="ASIN: ")    

    if check_asin_2 == None and check_asin_3 == None:
        print("Page is Not a Product")
        return (False, None)
    else:
        if check_asin_2 != None:
            asin = check_asin_2.findParent().text[5:]
        if check_asin_3 != None:
            asin = check_asin_3.findParent().text[5:]
        #TODO: Add additional checks to confirm the page is definatly a product!
        print("Page is a Product")
        return (True, asin)

The function above using Beautifulsoup to find certain HTML elements for example divs, spans, and in this case bold statements and checks the content within the bold statement. If the statement contains a string called "ASIN:" then we can be assured that there is a high chance that the page is definitely a product and the function returns the Asin and a boolean True.

If the page is recognised as a product then the extract_product function will create a Product object to store the information. The class is a good way to store and manage the information as there is a specific model and not randomly stored in a JSON format.

########  Product Class   ########
class Product:
    title = "e"
    brand = "e"
    url = "e"
    image_url = "e"
    blob_small = "Unknown"
    blob_large = "Unknown"
    source_id = "asin"
    source_domain = "amazon"
    
    ## Inti
    def __init__(self, product=None ):
        #Initialise Object with a Json array instead of using Setters.
        if product != None:           
            self.title = product.title
            self.brand = product.brand
            self.url = product.url
            self.images = product.images
            self.blob_small = product.blob_small
            self.blob_large = product.blob_large
            self.source_id = product.source_id
            self.source_domain = product.source_domain
        print("New Product object Initialised in memory")
        
    ## Setters and Getters    
    def SetTitle(self, title):
        self.title = title.strip()

    def SetBrand(self, brand):
        self.brand = brand    
    
    def SetUrl(self, url):
        self.url = url
        
    def SetImage(self, url):
        if len(url) > 1:
            self.image_url = url
    
    def SetSmallBlog(self, blob):
        self.blob_small = blob
    
    def SetLargeBlob(self, blob):
        self.blob_large = blob
        
    def SetSourceID(self, id):
        #Strip removes white spaces and any other none standard chars
        self.source_id = id.strip()
    
    def SetSourceDomain(self, domain):
        self.source_domain = domain
    
    
    ## Support 
    def FormCompleted(self):
        #TODO: Returns True if the fields have been filled in.
        if len(self.title) > 1 and len(self.brand) > 1 and len(self.url) > 1 and len(self.source_id) > 1 and len(self.source_domain) > 1 :
            return True
        else:
            return True

    def ReturnJson(self):
        #Reutnrs Object infomation in form of a Json array
        m = hashlib.md5()
        m.update(self.source_id)
        product = {
            'uid':        m.hexdigest(), #Set as main index in DynamoDB
            'title':      self.title,
            'brand':      self.brand,
            'url':        self.url,
            'image_url':     self.image_url,
            'small_keywords': self.blob_small,
            'large_keywords': self.blob_large,
            'sid':        self.source_id,
            'domain':     self.source_domain,
            'date':       strftime("%Y-%m-%d %H:%M:%S", gmtime())
        }
        return (product)

    def Print(self):
        print("### Printing Product ###")
        print(self.ReturnJson())
        print("###        end       ###")

The product class is not too difficult as it contains setters and getters as well as specific functions which helps generate a JSON object and prints the information to the terminal.

Once the product object is created and complete with information then the object is added to a buffer which a secondary multithreaded function handles the save feature of the function to an AWS DynamoDB database. Below is the save thread class which handles the connection and processing.

### ------------------------------------
###    Save Products to DynamoDB Class
### ------------------------------------
class SaveProducts:
    
    products_buffer = list()
    
    #Constructor function
    def __init__ (self):
        ### Save prodct into database
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table('productfinder_product_2')
        #Helper
        self.stopped = False
    
    ###---------------------------------------------------
    ###   Main handler function for the multi threading
    ###--------------------------------------------------- 
    def start(self):
        Thread(target=self.update, args=()).start()
        return self
    
    ### Runs on Multi Thread
    def update(self):
        with self.table.batch_writer() as batch:
            #Keep Running for Thread Life
            while True:
                # keep looping infinitely until the thread is stopped
                if len(self.products_buffer) > 0:
                    try:
                        self.table.put_item(Item = self.products_buffer[0].ReturnJson()) #Save oldest product
                        self.products_buffer.pop(0) #Remove oldest product
                        print("[**] Successfully Uploaded Product")
                        print("[*] Buffer Size: {}".format(len(self.products_buffer)))
                    except:
                        #Failed to save product into db.
                        #TODO: Add err message
                        print("[-] Upload Error")
                        self.stopped = True
                        

                # if the thread indicator variable is set, stop the thread
                # and resource camera resources
                if self.stopped:
                        return
                    
    def append(self, product):
        # Append product into buffer
        if product != None:
            self.products_buffer.append(product)
            print("yes")
            
    def alive(self):
        if len(self.products_buffer) < 1: 
            return False
        else:
            return True
        
    def stop(self):
        # indicate that the thread should be stopped
        self.stopped = True

Please note when creating a DynamoDB table, enter a string to the table index. In this case we use "uid".
Finally we can wrap the whole python application with the main function

### -----------------------
###     Main Function
### -----------------------
def main():
    print("Starting CommonCrawl Search")
    #Finds all relevant domins
    record_list = search_domain(domain)
    
    #Creating save object - Products are saved to Amazon DynamoDB
    savethread = SaveProducts().start()
    
    #Downloads page from CommconCrawl and Inspects, then Extracts infomation
    product_finder_1 = ProductFinder(record_list[0: int(len(record_list)/2)]).start(savethread)
    product_finder_2 = ProductFinder(record_list[int(len(record_list)/2): int(len(record_list))]).start(savethread)
    
    #Idle Main Thread
    while product_finder_1.check_status() != True and product_finder_2.check_status() != True:
        time.sleep(1)
        
    while savethread.alive(): 
        time.sleep(1)
        
    #Stop Threads    
    product_finder_1.stop()
    product_finder_2.stop()
    savethread.stop()

if __name__ == '__main__':
    main()
    #Fin

Here we see the URLs found, save thread created and started, as well as two product finder classes which handle half the URL list each. To prevent the main function finishing and closing before the multithreaded classes, we enable idlers which prevent the main function closing before the URL lists have emptied and the save product buffer is emptied as well.

Please find my GitHub page to see the full version of the code:
https://github.com/chedame/python-common-crawl-amazon-example

Thank you for the read and as all ways, Stay Awesome!

Also, note the application works best on Unix based machines, for example, Linux and Mac.
I'm currently using an Apple MacBook Pro to run this code.