DEV Community

J
J

Posted on

Asynchronous all the way, exploring MongoDB and first build data pipeline.

(This post will get me hired.)
Today, finally, all my useless data will get dumped (dump harder) into a big hole.

Image description

Imagine my first, official, data scraping project aimed for New Zealand Biggest Amazon platform - Trademe.com. I need to trim all the data, sensitive data and select only "valid" data to store. "Manually" compare all the keys from dictionaries.
A one of 500 JSON data call:

Image description

Imagine strak gold from a newer.
How can i store this fucking shit?
Here is goes:

import time
import asyncio
import aiohttp
from datetime import datetime
import csv
import re

async def get_responses(number):
    stringnumber = str(number)
    print("Start at " + stringnumber)
    params = {
        'page': f'{number}',
        'rows':'3000',
        }
    async with aiohttp.ClientSession() as session:
        async with session.get(url = url,headers = headers, params = params) as resp:
            very_raw = await resp.json()
            for current_dic_number in range(len(very_raw["List"])):
                local_dic = very_raw["List"][current_dic_number]
                local_key_list = []
                final_list = []
                for keys in local_dic:
                    if keys not in low_list:
                        local_key_list.append(keys)
                        print(local_key_list)
                for key in long:
                    if key not in local_key_list:
                        final_list.append("/")
                    elif key == "StartDate" or key == "EndDate":
                        new_key = str(datetime.fromtimestamp(int(re.findall('\d+', local_dic[key])[0]) / 1000))
                        final_list.append(new_key)
                    elif key == "OpenHomes":
                        if len(local_dic[key]) == 1:
                            local_dic[key][0]["Start"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][0]["Start"])[0]) / 1000))
                            local_dic[key][0]["End"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][0]["End"])[0]) / 1000))
                            final_list.append(local_dic[key])
                        elif len(local_dic[key]) == 2:
                            local_dic[key][0]["Start"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][0]["Start"])[0]) / 1000))
                            local_dic[key][0]["End"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][0]["End"])[0]) / 1000))
                            local_dic[key][1]["Start"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][1]["Start"])[0]) / 1000))
                            local_dic[key][1]["End"] = str(datetime.fromtimestamp(
                                int(re.findall('\d+', local_dic["OpenHomes"][1]["End"])[0]) / 1000))
                            final_list.append(local_dic[key])
                        else:
                            final_list.append(" ")
                    else:
                        final_list.append(local_dic[key])

                # with open("120920223.csv", "a", newline="") as file:
                #     writer = csv.writer(file)
                #     writer.writerow(final_list)
            print("Finished " + stringnumber)



async def main():
    tasks = []
    for number in range(/):
        tasks.append(get_responses(number))
    await asyncio.gather(*tasks)


if __name__ == '__main__':

    long = ['ListingId', 'StartDate', 'Title', 'EndDate', 'Region', 'Suburb', 'OpenHomes', 'GeographicLocation',
            'PriceDisplay', 'HasEmbeddedVideo', 'Has3DTour', 'Address', 'District', 'Area', 'LandArea', 'Bathrooms',
            'Bedrooms', 'Parking', 'PropertyType', 'RateableValue', 'AdjacentSuburbNames', 'Agency', 'TotalParking']
    low_list = ['StartPrice', 'Category', 'AsAt', 'CategoryPath', 'PictureHref', 'RegionId', 'ListingLength',
                'IsFeatured', 'HasGallery', 'IsBold', 'IsHighlighted', 'IsClassified', 'NoteDate', 'SuburbId',
                'ReserveState', 'PhotoUrls', 'AdditionalData', 'ListingGroup', 'ListingExtras', 'MemberId',
                'AgencyReference', 'DistrictId', 'AdjacentSuburbIds' ]
    headers = {
        'authority': 'api.trademe.co.nz',
        'method': 'GET',
        'accept': 'pplication/json, text/plain, */*',
        'accept-encoding': 'gzip, deflate, br',
        'accept-language': 'en,en-US;q=0.9',
        'cache-control': 'max-age=0',
        'newrelic': 'eyJ2IjpbMCwx=XSwiZCI6eyJ0eSI6IkJyb3dzZXIiLCJhYyI6IjQzODYzOCIsImFwIjoiMzgwMDc2Nzg0IiwiaWQiOiIxOGZjMWRkNzM1YTE4MmViIiwidHIiOiI4NGRiNzIyODE2Yjk1NzhjZDEzMTIxYjc3MGQ3MzQwMCIsInRpIjoxNjcwMjg3Mjg0NzU2fX0=',
        'origin': 'https://www.trademe.co.nz',
        'referer': 'https://www.trademe.co.nz/',
        'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108", "Google Chrome";v="108"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'some-site',
        'upgrade-insecure-requests': '1',
        # 'x-trademe-uniqueclientid': '51f6a3d6-24e6-c673-45fe-8646ad15be04'

    }
    url = "Protecting a vulnerable API"

    with open("Processing and Visualization/120920222.csv", "a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(long)
    start = time.time()

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
    stop = time.time()
    print(stop - start)

Enter fullscreen mode Exit fullscreen mode

Those lines literally took me 2 days. Till today i still don't (or don't want to) know how i wrote these:

  local_dic = very_raw["List"][current_dic_number]
  local_key_list = []
  final_list = []
  for keys in local_dic:
     if keys not in low_list:
           local_key_list.append(keys)
           print(local_key_list)
           for key in long:
               if key not in local_key_list:
Enter fullscreen mode Exit fullscreen mode

A nice shit.

Image description

Image description
In 20 seconds:

Image description

But i want all the useless data, like agency phone numbers so i can call them at midnight when i scared to sleep.
Before dump it i wrote a little testing code to know something better.

import asyncio
import time
import psutil
# Start 50,000 requests to test is there any latency or memory lose when program reading from local or shared data. The answer is they are same.

var = 100 (shared)
async def async_scraper(ID):
    # var = 100 (local)
    var_final = var + 1000
    print(var_final)
    pid = psutil.Process()
    memory_info = pid.memory_info()
    memory_used_mb = memory_info.rss / (1024 ** 2)
    print("Memory used:", memory_used_mb, "MB")

async def Gather():
    tasks = []
    for ID in range(1, 50000):
        tasks.append(async_scraper(ID))
    await asyncio.gather(*tasks)

start = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(Gather())
finally:
    loop.close()
stop = time.time()
print(stop-start)


Enter fullscreen mode Exit fullscreen mode
- As now the behave of import and reading function are the same to look up the location inside the memory. But i tested for starting 50,000 requests to if there is any changes in run time and memory usage. - It's the same

import asyncio
import time
from TestingModules import Gather

start = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(Gather())
finally:
    loop.close()
stop = time.time()
print(stop-start)


Enter fullscreen mode Exit fullscreen mode
import time
import asyncio
import aiohttp
from pymongo import MongoClient, errors
from pymongo.server_api import ServerApi
from Credentials import Credentials
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient

def convert_milliseconds(ms):
    seconds = int(ms) / 1000.0
    return datetime.fromtimestamp(seconds)

async def async_scraper(ID):
    params = {
        'page': f'{ID}',
        'rows': '3000',
    }
    async with aiohttp.ClientSession() as session:
        async with session.get(url=url, headers=headers, params=params) as r:
            json = await r.json()
            batch = json['List']
            print(len(batch))
            for x in range(len(batch)):
                batch[x]["G_Date"] = timestamp
                try:
                    batch[x]["StartDate"] = convert_milliseconds(batch[x]["StartDate"][6:-2])
                except:
                    pass
                try:
                    batch[x]["EndDate"] = convert_milliseconds(batch[x]["EndDate"][6:-2])
                except:
                    pass
                try:
                    batch[x]["AsAt"] = convert_milliseconds(batch[x]["AsAt"][6:-2])
                except:
                    pass
                try:
                    batch[x]["OpenHomes"][0]["Start"] = convert_milliseconds(batch[x]["OpenHomes"][0]["Start"][6:-2])
                    batch[x]["OpenHomes"][0]["End"] = convert_milliseconds(batch[x]["OpenHomes"][0]["End"][6:-2])
                except:
                    pass
                try:
                    batch[x]["OpenHomes"][1]["Start"] = convert_milliseconds(batch[x]["OpenHomes"][1]["Start"][6:-2])
                    batch[x]["OpenHomes"][1]["End"] = convert_milliseconds(batch[x]["OpenHomes"][1]["End"][6:-2])
                except:
                    pass
            result = await coll.insert_many(batch, ordered=False )
            if result.acknowledged:
                print(f"batch: {ID} Completed")
                # print(batch[0])
                return True
            else:
                print(f">>>> batch: {ID} ERROR")
async def Gather():
    tasks = []
    for ID in range(/):
        tasks.append(async_scraper(ID))
    await asyncio.gather(*tasks)

def loop():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(Gather())
    finally:
        loop.close()

headers = {
        'authority': 'api.trademe.co.nz',
        'method': 'GET',
        'accept': 'pplication/json, text/plain, */*',
        'accept-encoding': 'gzip, deflate, br',
        'accept-language': 'en,en-US;q=0.9',
        'cache-control': 'max-age=0',
        'newrelic': 'eyJ2IjpbMCwx=XSwiZCI6eyJ0eSI6IkJyb3dzZXIiLCJhYyI6IjQzODYzOCIsImFwIjoiMzgwMDc2Nzg0IiwiaWQiOiIxOGZjMWRkNzM1YTE4MmViIiwidHIiOiI4NGRiNzIyODE2Yjk1NzhjZDEzMTIxYjc3MGQ3MzQwMCIsInRpIjoxNjcwMjg3Mjg0NzU2fX0=',
        'origin': 'https://www.trademe.co.nz',
        'referer': 'https://www.trademe.co.nz/',
        'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108", "Google Chrome";v="108"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'some-site',
        'upgrade-insecure-requests': '1',
    }
url = "Protecting a vulnerable API"

uri = Credentials.uri
client = AsyncIOMotorClient(uri, server_api=ServerApi('1'))
try:
    if client.server_info():
            print("Connected to MongoDB server")
            start = time.time()
            db = client["Tradme"]
            coll = db["Tradme_Property_Test"]
            current_datetime = datetime.now()
            timestamp = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
            loop()
            stop = time.time()
            print(stop-start)
            coll.estimated_document_count()
            client.close()


except errors.ConnectionFailure as e:
    print("Failed to connect to MongoDB server:", e)
except Exception as e:
    print("An error occurred:", e)

Enter fullscreen mode Exit fullscreen mode

Solving performance issues:
20s to 197s? I know there is something wrong here.

Image description
Added await as there is some bottleneck.
Image description

Top comments (0)