Common-Crawl (CC) is an Awesome free and open source collection of crawled data from the world wide web spanning back many years. Common-Crawl does what Google and Bing does but allows for anyone to access their information and analysis the information for free and to use the data commercially for free. The data sets recorded are now tipping many petabytes and are stored on AWS S3 for free courteous of Amazon.
Today we will be investing this available information by using Python and a couple plugins to analyze the stored raw HTML code.
The data stored is in a compressed format due to many pure HTML pages, hence finding specific pages can be difficult. Common-Crawl have provided a useful API (http://index.commoncrawl.org/) which Apps may access and use to find all pages with a specific domain name. When visiting the API page, you may notice how there is a long list of entries which start with “/CC-MAIN-…”. This long list is the snapshots or datasets that Common-Crawl find each month, this means that you may actually go back many months or years to find and extract information. For this example, we will use the latest dataset CC-MAIN-2017-39-index.
Below is the API call we will use in our application. The first %s is the data set number so eg. “2017-39” and the second is the %s is the desired domain to search.
Pseudocode Break Down
- Search domain – Common-crawl API
- Add URLs to list
- Start saving thread
- Slip up URLs into two threads
- Loop through list of URLs, Download page and confirm if product. Full HTML Archive copy from Common-crawl
- If Page is a product, extract details and create new product Object
- Add object to save buffer, which adds it to DynamoDB
- Loop until list completed
### -----------------------
### Searches the Common Crawl Index for a domain.
### -----------------------
Searches the Common Crawl Index for a domain.
def search_domain(domain):
record_list = []
print "[*] Trying target domain: %s" % domain
for index in index_list:
print "[*] Trying index %s" % index
cc_url = "http://index.commoncrawl.org/CC-MAIN-%s-index?" % index
cc_url += "url=%s&matchType=domain&output=json" % domain
response = requests.get(cc_url)
if response.status_code == 200:
records = response.content.splitlines()
for record in records:
record_list.append(json.loads(record))
print "[*] Added %d results." % len(records)
print "[*] Found a total of %d hits." % len(record_list)
return record_list
The python code accepts a domain name to search and returns all the URLs which belong to the domain name from the common crawl dataset. Once the URLs have been downloaded and stored in a list, we can move on to downloading the compressed page and performing the analysis.
#---------------------------
# Downloads full page
#---------------------------
def download_page(record):
offset, length = int(record['offset']), int(record['length'])
offset_end = offset + length - 1
# We'll get the file via HTTPS so we don't need to worry about S3 credentials
# Getting the file on S3 is equivalent however - you can request a Range
prefix = 'https://commoncrawl.s3.amazonaws.com/'
# We can then use the Range header to ask for just this set of bytes
resp = requests.get(prefix + record['filename'], headers={'Range': 'bytes={}-{}'.format(offset, offset_end)})
# The page is stored compressed (gzip) to save space
# We can extract it using the GZIP library
raw_data = StringIO.StringIO(resp.content)
f = gzip.GzipFile(fileobj=raw_data)
# What we have now is just the WARC response, formatted:
data = f.read()
response = ""
if len(data):
try:
warc, header, response = data.strip().split('\r\n\r\n', 2)
except:
pass
return response
The python code above uses the inbuilt python library “requests” to download a compressed page from the dataset saved on Amazons AWS S3. The downloaded page is then extracted using the “Gzip” library and the raw HTML data is returned as the response. Once the page is downloaded we can use our custom function with the help of the python plugin “Beautifulsoup” to find specific data residing in the HTML code.
#---------------------
# Extract Product from the single HTML page.
#----------------------
def extract_product(html_content, url):
#String Buffer
string_buffer = ""
errs = list()
#Read page and read to extract product infomation
parser = BeautifulSoup(html_content, "html.parser")
#Check if the page is a product, if not skip page.
truth, asin = check_page(parser)
if not truth:
errs.append("Not product")
return (False, errs)
#New Product as a object
product = Product()
#New Keyword rank
keyword = Rake(SmartStopList.words())
#Find URL
product.SetUrl(url)
#Find Brand: Note: Some products have an image for the brand
truth, string_buffer = search_table(parser, {"id": "productDetails_techSpec_section_1"}, "Brand Name")
if truth:
product.SetBrand(string_buffer)
else:
string_buffer = parser.find("a", attrs={"id": "brand"})
if string_buffer != None:
product.SetBrand(string_buffer.get_text().strip())
else:
errs.append("Could not find Brand")
#Find Title
string_buffer = parser.find("span", attrs={"id": "productTitle"})
if string_buffer != None:
product.SetTitle(string_buffer.get_text().strip())
else:
errs.append("Could not find Title")
return (False, errs)
#Find Image
string_buffer = parser.find("img", attrs={"id": "landingImage"})
if string_buffer != None:
string_buffer = string_buffer.get("data-old-hires")
if len(string_buffer) < 2:
string_buffer = parser.find("img", attrs={"id": "landingImage"}).get("data-a-dynamic-image")
m = re.search('https://(.+?).jpg', string_buffer)
if m:
string_buffer = m.group(1)
string_buffer = "https://{}.jpg".format(string_buffer)
#print ("Img Url: "+string_buffer)
product.SetImage(string_buffer)
else:
errs.append("Could not find Image")
#Find Small Blob
#TODO: Need to perform keyword analysis
string_buffer = parser.find("div", attrs={"id": "feature-bullets"})
if string_buffer != None:
string_buffer = string_buffer.find("ul")
try:
string_buffer = string_buffer.find_all("li")
if string_buffer != None:
string_buffer_2 = ""
for span in string_buffer:
string_buffer_3 = span.find("span")
if string_buffer_3 != None:
string_buffer_3 = string_buffer_3.get_text()
try:
string_buffer_2 = "{} {}".format(string_buffer_2, string_buffer_3.strip())
except:
pass
saved_buffer = string_buffer_2.strip()
#Calculating Key Words
keywords_1 = keyword.run(saved_buffer)
product.SetSmallBlog(keywords_1)
except:
errs.append("Error finding li")
else:
errs.append("Could not find small section keywords")
#Find Large Blob
#TODO: Need to perform keyword analysis
string_buffer = parser.find("div", attrs={"id": "productDescription"})
if string_buffer != None:
string_buffer = string_buffer.find("p")
if string_buffer != None:
string_buffer = string_buffer.get_text()
saved_buffer = string_buffer.strip()
#Calculating Key Words
keywords_2 = keyword.run(saved_buffer)
product.SetLargeBlob(keywords_2)
else:
errs.append("Could not find large section keywords")
#Find ASIN
product.SetSourceID(asin)
#TODO: Perform price save!
#Append the product to large list of products
if product.FormCompleted():
return (product, errs)
else:
return (False, errs)
Now, there is a bit to explain here.
The function above accepts two inputs, the HTML code and the URL for the page. The page first initialises the BeautifulSoup library to a variable called parser. The function will then check if the page that it is currently inspecting is definitely a products page, as the common crawl API will return pages and URLs which are mixed with menus, corporate info, deals pages, etc. This is done using the function below.
#-------------------
# Perform Precheck to see if page is a product
#-------------------
def check_page(parsed):
parser = parsed
#First Check of ASIN
found, asin = search_table(parser, {"id": "productDetails_detailBullets_sections1"}, "ASIN")
if found:
return (True, asin)
#Second Check of ASIN
check_asin_2 = parser.find("b", text="ASIN:")
check_asin_3 = parser.find("b", text="ASIN: ")
if check_asin_2 == None and check_asin_3 == None:
print("Page is Not a Product")
return (False, None)
else:
if check_asin_2 != None:
asin = check_asin_2.findParent().text[5:]
if check_asin_3 != None:
asin = check_asin_3.findParent().text[5:]
#TODO: Add additional checks to confirm the page is definatly a product!
print("Page is a Product")
return (True, asin)
The function above using Beautifulsoup to find certain HTML elements for example divs, spans, and in this case bold statements and checks the content within the bold statement. If the statement contains a string called “ASIN:” then we can be assured that there is a high chance that the page is definitely a product and the function returns the Asin and a boolean True.
If the page is recognised as a product then the extract_product function will create a Product object to store the information. The class is a good way to store and manage the information as there is a specific model and not randomly stored in a JSON format.
######## Product Class ########
class Product:
title = "e"
brand = "e"
url = "e"
image_url = "e"
blob_small = "Unknown"
blob_large = "Unknown"
source_id = "asin"
source_domain = "amazon"
## Inti
def __init__(self, product=None ):
#Initialise Object with a Json array instead of using Setters.
if product != None:
self.title = product.title
self.brand = product.brand
self.url = product.url
self.images = product.images
self.blob_small = product.blob_small
self.blob_large = product.blob_large
self.source_id = product.source_id
self.source_domain = product.source_domain
print("New Product object Initialised in memory")
## Setters and Getters
def SetTitle(self, title):
self.title = title.strip()
def SetBrand(self, brand):
self.brand = brand
def SetUrl(self, url):
self.url = url
def SetImage(self, url):
if len(url) > 1:
self.image_url = url
def SetSmallBlog(self, blob):
self.blob_small = blob
def SetLargeBlob(self, blob):
self.blob_large = blob
def SetSourceID(self, id):
#Strip removes white spaces and any other none standard chars
self.source_id = id.strip()
def SetSourceDomain(self, domain):
self.source_domain = domain
## Support
def FormCompleted(self):
#TODO: Returns True if the fields have been filled in.
if len(self.title) > 1 and len(self.brand) > 1 and len(self.url) > 1 and len(self.source_id) > 1 and len(self.source_domain) > 1 :
return True
else:
return True
def ReturnJson(self):
#Reutnrs Object infomation in form of a Json array
m = hashlib.md5()
m.update(self.source_id)
product = {
'uid': m.hexdigest(), #Set as main index in DynamoDB
'title': self.title,
'brand': self.brand,
'url': self.url,
'image_url': self.image_url,
'small_keywords': self.blob_small,
'large_keywords': self.blob_large,
'sid': self.source_id,
'domain': self.source_domain,
'date': strftime("%Y-%m-%d %H:%M:%S", gmtime())
}
return (product)
def Print(self):
print("### Printing Product ###")
print(self.ReturnJson())
print("### end ###")
The product class is not too difficult as it contains setters and getters as well as specific functions which helps generate a JSON object and prints the information to the terminal.
Once the product object is created and complete with information then the object is added to a buffer which a secondary multithreaded function handles the save feature of the function to an AWS DynamoDB database. Below is the save thread class which handles the connection and processing.
### ------------------------------------
### Save Products to DynamoDB Class
### ------------------------------------
class SaveProducts:
products_buffer = list()
#Constructor function
def __init__ (self):
### Save prodct into database
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table('productfinder_product_2')
#Helper
self.stopped = False
###---------------------------------------------------
### Main handler function for the multi threading
###---------------------------------------------------
def start(self):
Thread(target=self.update, args=()).start()
return self
### Runs on Multi Thread
def update(self):
with self.table.batch_writer() as batch:
#Keep Running for Thread Life
while True:
# keep looping infinitely until the thread is stopped
if len(self.products_buffer) > 0:
try:
self.table.put_item(Item = self.products_buffer[0].ReturnJson()) #Save oldest product
self.products_buffer.pop(0) #Remove oldest product
print("[**] Successfully Uploaded Product")
print("[*] Buffer Size: {}".format(len(self.products_buffer)))
except:
#Failed to save product into db.
#TODO: Add err message
print("[-] Upload Error")
self.stopped = True
# if the thread indicator variable is set, stop the thread
# and resource camera resources
if self.stopped:
return
def append(self, product):
# Append product into buffer
if product != None:
self.products_buffer.append(product)
print("yes")
def alive(self):
if len(self.products_buffer) < 1:
return False
else:
return True
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
Please note when creating a DynamoDB table, enter a string to the table index. In this case we use “uid”.
Finally we can wrap the whole python application with the main function
### -----------------------
### Main Function
### -----------------------
def main():
print("Starting CommonCrawl Search")
#Finds all relevant domins
record_list = search_domain(domain)
#Creating save object - Products are saved to Amazon DynamoDB
savethread = SaveProducts().start()
#Downloads page from CommconCrawl and Inspects, then Extracts infomation
product_finder_1 = ProductFinder(record_list[0: int(len(record_list)/2)]).start(savethread)
product_finder_2 = ProductFinder(record_list[int(len(record_list)/2): int(len(record_list))]).start(savethread)
#Idle Main Thread
while product_finder_1.check_status() != True and product_finder_2.check_status() != True:
time.sleep(1)
while savethread.alive():
time.sleep(1)
#Stop Threads
product_finder_1.stop()
product_finder_2.stop()
savethread.stop()
if __name__ == '__main__':
main()
#Fin
Here we see the URLs found, save thread created and started, as well as two product finder classes which handle half the URL list each. To prevent the main function finishing and closing before the multithreaded classes, we enable idlers which prevent the main function closing before the URL lists have emptied and the save product buffer is emptied as well.
Please find my GitHub page to see the full version of the code:
https://github.com/chedame/python-common-crawl-amazon-example
Thank you for the read and as all ways, Stay Awesome!
Also, note the application works best on Unix based machines, for example, Linux and Mac.
I’m currently using an Apple MacBook Pro to run this code.