#!/usr/bin/python3
"""
(C) 2023 Christopher Overbeck
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License version 3.
http://www.gnu.org/licenses/gpl-3.0.html
"""
import re
import time
from bs4 import BeautifulSoup
import aiohttp
import asyncio
import csv
extension of Semaphore that also throttles the number of requests per second
class ThrottledSemaphore(asyncio.Semaphore):
def init(self, value, rate):
super().init(value)
self.rate = rate
self.last_request = 0
async def acquire(self):
await super().acquire()
now = time.time()
if now - self.last_request < self.rate:
await asyncio.sleep(self.rate - (now - self.last_request))
self.last_request = time.time()
async def main():
# prints the mass launched for each year. Also reports number of launches with unknown (0) mass
start_url = 'http://space.skyrocket.de/directories/chronology.htm'
max_concurrent_connections = 5
max_connections_per_second = 10
semaphore = ThrottledSemaphore(max_concurrent_connections, 1/max_connections_per_second)
async with aiohttp.ClientSession() as session:
year_links = await get_year_links(session, start_url)
coro = [get_year(semaphore, session, link) for link in year_links]
tasks = [asyncio.create_task(c) for c in coro]
done, pending = await asyncio.wait(tasks)
years = [task.result() for task in done]
print("---")
for year in years:
print(f"{year[0]}: {year[1]} KG, {year[2]} launches, {year[3]} unknown mass")
print("---")
with open('launches.csv', 'w') as f:
writer = csv.writer(f)
writer.writerow(['year', 'total_mass', 'num_launches', 'num_unknown_mass'])
for year in years:
writer.writerow([year[0], year[1], year[2], year[3]])
while True:
await asyncio.sleep(1)
async def get_year_links(client_session: aiohttp.ClientSession, start_url: str):
"""Returns a list of links to each year's launches"""
year_links = []
async with client_session.get(start_url) as response:
html = await response.text()
parsed_page = BeautifulSoup(html, features="html.parser")
links = parsed_page.find_all('a')
for link in links:
link = str(link)
if 'doc_chr' in link:
tmp_link = link[link.find('"')+1:]
tmp_link = tmp_link[:tmp_link.find('"')]
year_links.append(tmp_link)
return year_links
class Counter:
def init(self, max_count: int, label: str = None):
self.count = 0
self.max_count = max_count
self.label = label
def increment(self):
self.count += 1
print(f"{self.label} {self.count}/{self.max_count}")
async def get_year(semaphore: asyncio.Semaphore, client_session: aiohttp.ClientSession, link: str):
"""Returns (year, total_mass, num_launches, num_unknown_mass) for the passed year link"""
# links look like ../doc_chr/lau1957.htm
year = int(re.search(r'(\d+)', link).group(0))
year_launches = []
async with semaphore:
async with client_session.get('http://space.skyrocket.de/' + link[3:]) as response:
html = await response.text()
parsed_page = BeautifulSoup(html, features="html.parser")
links = parsed_page.find_all('a')
for link in links:
link = str(link)
if 'doc_sdat' in link:
tmp_link = link[link.find('"')+1:]
tmp_link = tmp_link[:tmp_link.find('"')]
year_launches.append(tmp_link)
counter = Counter(len(year_launches), f"{year}")
tasks = [asyncio.create_task(get_launch_mass(semaphore, client_session, launch, counter)) for launch in year_launches]
done, pending = await asyncio.wait(tasks)
total_mass = [task.result() for task in done]
return (year, sum(total_mass), len(total_mass), total_mass.count(0))
async def get_launch_mass(semaphore: asyncio.Semaphore, client_session: aiohttp.ClientSession, launch_url: str, counter: Counter):
"""Returns the mass of a given launch
0 if the mass is unknown"""
mass = 0
try:
async with semaphore:
# print(f"Getting mass for {launch_url}")
async with client_session.get('http://space.skyrocket.de/' + launch_url[3:]) as response:
html = await response.text()
parsed_page = BeautifulSoup(html, features="html.parser")
for pre_mass_element in parsed_page.find_all(text='Mass:'):
mass_element = pre_mass_element.find_next()
mass = int(re.search(r'(\d+)', mass_element.text).group(0))
# print(f"Got mass of {mass}KG for {launch_url}")
except Exception:
# print(f"Error getting mass for {launch_url}")
pass
counter.increment()
return mass
if name == 'main':
asyncio.run(main())