import json import time from urllib.parse import quote import pydest from bs4 import BeautifulSoup from bungied2auth import BungieOAuth from datetime import datetime, timezone, timedelta from dateutil.parser import * import aiohttp import sqlite3 import matplotlib.pyplot as plt import csv import codecs import asyncio class D2data: api_data_file = open('api.json', 'r') api_data = json.loads(api_data_file.read()) destiny = '' cache_db = '' data_db = sqlite3.connect('data.db') data_cursor = data_db.cursor() icon_prefix = "https://www.bungie.net" token = {} headers = {} data = {} wait_codes = [1672] max_retries = 10 vendor_params = { 'components': '400,401,402,302,304,306,310,305' } activities_params = { 'components': '204' } record_params = { "components": "900,700" } metric_params = { "components": "1100" } char_info = {} def __init__(self, prod, context, **options): super().__init__(**options) if prod: self.oauth = BungieOAuth(self.api_data['id'], self.api_data['secret'], context=context, host='0.0.0.0', port='4200') else: self.oauth = BungieOAuth(self.api_data['id'], self.api_data['secret'], host='localhost', port='4200') self.session = aiohttp.ClientSession() self.cache_db = sqlite3.connect('cache.db') try: self.data_cursor.execute('''CREATE TABLE "dailyrotations" ("items" TEXT)''') except sqlite3.OperationalError: pass try: self.data_cursor.execute('''CREATE TABLE "weeklyrotations" ("items" TEXT)''') except sqlite3.OperationalError: pass try: self.data_cursor.execute('''CREATE TABLE "season_ev" ("items" TEXT)''') except sqlite3.OperationalError: pass try: self.data_cursor.execute('''CREATE TABLE "evweekly" ("items" TEXT)''') except sqlite3.OperationalError: pass self.data_db.commit() async def get_spider(self, size='tall', langs=['ru'], forceget=False): char_info = self.char_info rotations = {} lang = 'ru' spider_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/Character/{}/Vendors/863940356/'. \ format(char_info['platform'], char_info['membershipid'], char_info['charid'][0]) spider_resp = await self.get_bungie_json('spider', spider_url, self.vendor_params, string='spider') if spider_resp: spider_json = spider_resp spider_cats = spider_json['Response']['categories']['data']['categories'] spider_def = await self.destiny.decode_hash(863940356, 'DestinyVendorDefinition', language=lang) items_to_get = spider_cats[0]['itemIndexes'] spider = await self.get_vendor_sales(lang, spider_resp, items_to_get, [1812969468]) rotations = { 'name': 'Паук', 'size': size, 'items': spider, 'template': 'table_items.html' } return rotations async def get_drifter(self, size='tall', langs=['ru'], forceget=False): char_info = self.char_info rotations = {} lang = 'ru' cat_templates = { '6': 'contract_item.html', '0': 'weapon_item.html', '4': 'armor_item.html' } drifter_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/Character/{}/Vendors/248695599/'. \ format(char_info['platform'], char_info['membershipid'], char_info['charid'][0]) drifter_resp = await self.get_bungie_json('drifter', drifter_url, self.vendor_params, string='drifter') if drifter_resp: drifter_cats = drifter_resp['Response']['categories']['data']['categories'] drifter_def = await self.destiny.decode_hash(248695599, 'DestinyVendorDefinition', language=lang) sales = [] for category in drifter_cats: if category['displayCategoryIndex'] in [2,3]: continue cat_sales = await self.get_vendor_sales(lang, drifter_resp, category['itemIndexes'], []) sales.append({ 'name': drifter_def['displayCategories'][category['displayCategoryIndex']]['displayProperties']['name'], 'items': cat_sales, 'template': cat_templates[str(category['displayCategoryIndex'])] }) rotations = { 'name': 'Скиталец', 'size': size, 'items': sales, 'template': 'vendor_items.html', 'annotations': [] } return rotations async def get_xur_loc(self): url = 'https://paracausal.science/xur/current.json' session = aiohttp.ClientSession() r = await session.get(url) r_json = await r.json() await session.close() return r_json async def get_xur(self, langs=['ru'], size='', forceget=False): char_info = self.char_info rotations = {} lang = 'ru' cat_templates = { '6': 'contract_item.html', '0': 'weapon_item.html', '4': 'armor_item.html' } xur_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/Character/{}/Vendors/2190858386/'. \ format(char_info['platform'], char_info['membershipid'], char_info['charid'][0]) xur_resp = await self.get_bungie_json('xur', xur_url, self.vendor_params) xur_loc_task = asyncio.ensure_future(self.get_xur_loc()) xur_loc = await asyncio.gather(xur_loc_task) xur_loc = xur_loc[0] if xur_resp: xur_cats = xur_resp['Response']['categories']['data']['categories'] xur_def = await self.destiny.decode_hash(2190858386, 'DestinyVendorDefinition', language=lang) cat_sales = await self.get_vendor_sales(lang, xur_resp, xur_cats[0]['itemIndexes'], [3875551374]) xur_sales = xur_resp['Response']['sales']['data'] sales = [] if xur_loc: xur_place_name = await self.destiny.decode_hash(xur_loc['placeHash'], 'DestinyPlaceDefinition', language=lang) xur_destination_name = await self.destiny.decode_hash(xur_loc['destinationHash'], 'DestinyDestinationDefinition', language=lang) sales = [{'name': '{}, {}'.format(xur_place_name['displayProperties']['name'], xur_destination_name['displayProperties']['name']), 'items': [], 'template': cat_templates['6']}, {'name': 'Оружие', 'items': [], 'template': cat_templates['0']}, {'name': 'Броня', 'items': [], 'template': cat_templates['4']}] for key in sorted(xur_sales.keys()): item_hash = xur_sales[key]['itemHash'] if item_hash not in [4285666432, 2293314698, 2125848607, 3875551374]: definition = 'DestinyInventoryItemDefinition' item_resp = await self.destiny.decode_hash(item_hash, definition, language=lang) item_name = item_resp['displayProperties']['name'] if item_resp['itemType'] == 2: for item in cat_sales: if item['hash'] == item_hash: sales[2]['items'].append(item) else: for item in cat_sales: if item['hash'] == item_hash: sales[1]['items'].append(item) # sales.append({ # 'name': xur_def['displayCategories'][category['displayCategoryIndex']]['displayProperties']['name'], # 'items': cat_sales, # 'template': cat_templates[str(category['displayCategoryIndex'])] # }) rotations = { 'name': 'Зур', 'size': size, 'items': sales, 'template': 'vendor_items.html', 'annotations': ['Данные о местоположении предоставлены сервисом paracausal.science'] } return rotations async def get_heroic_story(self, size='wide', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('heroicstory', string='heroic story missions', force=forceget) if not activities_resp: return {} for lang in langs: heroics = [] activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if 'Ежедневная героическая сюжетная миссия: ' in r_json['displayProperties']['name']: info = { "name": r_json['selectionScreenDisplayProperties']['name'], "description": r_json['selectionScreenDisplayProperties']['description'] } heroics.append(info) return { 'name': 'Героические сюжетные миссии', 'size': size, 'items': heroics, 'template': 'table_items.html' } async def get_forge(self, size='', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('forge', force=forceget) if not activities_resp: return {} for lang in langs: forges = [] activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if 'Кузница' in r_json['displayProperties']['name']: forge_def = 'DestinyDestinationDefinition' place = await self.destiny.decode_hash(r_json['destinationHash'], forge_def, language=lang) info = { "icon": r_json['displayProperties']['icon'], "name": r_json['displayProperties']['name'], "description": place['displayProperties']['name'] } forges.append(info) rotations = { 'name': 'Кузница', 'size': size, 'items': forges, 'template': 'table_items.html' } return rotations async def get_strike_modifiers(self, size='wide', langs=['ru'], forceget=False): char_info = self.char_info activities_resp = await self.get_activities_response('vanguardstrikes', string='strike modifiers') lang = 'ru' rotations = {} if activities_resp: activities_json = activities_resp modifiers = [] for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if 'Ежедневная героическая сюжетная миссия: ' in r_json['displayProperties']['name']: modifiers = await self.decode_modifiers(key, lang) rotations = { 'name': 'Модификаторы плейлиста налетов', 'size': size, 'items': modifiers, 'template': 'table_items.html' } return rotations def add_reckoning_boss(self, lang): first_reset_time = 1539709200 seconds_since_first = time.time() - first_reset_time weeks_since_first = seconds_since_first // 604800 reckoning_bosses = ['Мечи', 'Двойник Орикса'] return reckoning_bosses[int(weeks_since_first % 2)] async def get_reckoning_modifiers(self, size='wide', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('reckoning', string='reckoning modifiers', force=forceget) if not activities_resp: return {} for lang in langs: r_info = { 'icon': '/common/destiny2_content/icons/' 'DestinyActivityModeDefinition_e74b3385c5269da226372df8ae7f500d.png', 'name': 'Босс', 'description': self.add_reckoning_boss(lang) } activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if 'Суд: Уровень III' in r_json['displayProperties']['name']: mods = await self.decode_modifiers(key, lang) r_info = [r_info, *mods] return { 'name': 'Суд', 'size': size, 'items': r_info, 'template': 'table_items.html' } async def get_nightfall820(self, size='', langs=['ru'], forceget=False): lang = 'ru' activities_resp = await self.get_activities_response('nightfalls820', string='820 nightfalls') if activities_resp: activities_json = activities_resp nightfalls = [] for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) try: recommended_light = key['recommendedLight'] if recommended_light == 820: if r_json['matchmaking']['requiresGuardianOath']: info = { 'name': 'Сумрак с гидом', 'description': r_json['selectionScreenDisplayProperties']['name'] } else: info = { 'name': r_json['selectionScreenDisplayProperties']['name'], 'description': r_json['selectionScreenDisplayProperties']['description'] } nightfalls.append(info) except KeyError: pass return { 'name': 'Сумрачные налеты', 'size': size, 'items': nightfalls, 'template': 'table_items.html' } else: return {} async def get_modifiers(self, lang, act_hash): url = 'https://www.bungie.net/{}/Explore/Detail/DestinyActivityDefinition/{}'.format(lang, act_hash) session = aiohttp.ClientSession() r = await session.get(url) r = await r.text() soup = BeautifulSoup(r, features="html.parser") modifier_list = soup.find_all('div', {'data-identifier': 'modifier-information'}) modifiers = [] for item in modifier_list: modifier = item.find('div', {'class': 'text-content'}) modifier_title = modifier.find('div', {'class': 'title'}) modifier_subtitle = modifier.find('div', {'class': 'subtitle'}) mod = { "name": modifier_title.text, "description": modifier_subtitle.text } modifiers.append(mod) if r: await session.close() return modifiers else: await session.close() return False async def get_raids(self, size='wide tall', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('raids', force=forceget) if not activities_resp: return {} raids = [] for lang in langs: first_reset_time = 1580230800 seconds_since_first = time.time() - first_reset_time weeks_since_first = seconds_since_first // 604800 eow_loadout = int(weeks_since_first % 6) last_wish_challenges = [1250327262, 3871581136, 1568895666, 4007940282, 2836954349] sotp_challenges = [1348944144, 3415614992, 1381881897] cos_challenges = [2459033425, 2459033426, 2459033427] levi_order = { "417231112": "1. Сады удовольствий
\n2. Турнир
\n3. Королевские бассейны
\n4. Трон", "757116822": "1. Турнир
\n2. Королевские бассейны
\n3. Сады удовольствий
\n4. Трон", "1685065161": "1. Турнир
\n2. Сады удовольствий
\n3. Королевские бассейны
\n4. Трон", "2449714930": "1. Королевские бассейны
\n2. Турнир
\n3. Сады удовольствий
\n4. Трон", "3446541099": "1. Сады удовольствий
\n2. Королевские бассейны
\n3. Турнир
\n4. Трон", "3879860661": "1. Королевские бассейны
\n2. Сады удовольствий
\n3. Турнир
\n4. Трон" } eow_loadouts = [ "Кинетическое: Револьвер", "Энергетическое: Снайперская винтовка", "Силовое: Любое", "Кинетическое: Автомат", "Энергетическое: Автомат", "Силовое: Любое", "Кинетическое: Пистолет", "Энергетическое: Винтовка разведчика", "Силовое: Меч", "Кинетическое: Пистолет-пулемет", "Энергетическое: Любое", "Силовое: Гранатомет", "Кинетическое: Любое", "Энергетическое: Плазменная винтовка", "Силовое: Плазменная винтовка", "Кинетическое: Дробовик", "Энергетическое: Автомат", "Силовое: Ракетная установка" ] lw_ch = 0 sotp_ch = 0 cos_ch = 0 hawthorne_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/Character/{}/Vendors/3347378076/'. \ format(self.char_info['platform'], self.char_info['membershipid'], self.char_info['charid'][0]) hawthorne_resp = await self.get_bungie_json('hawthorne', hawthorne_url, self.vendor_params) if not hawthorne_resp: return hawthorne_json = hawthorne_resp for cat in hawthorne_json['Response']['sales']['data']: if hawthorne_json['Response']['sales']['data'][cat]['itemHash'] in last_wish_challenges: lw_ch = hawthorne_json['Response']['sales']['data'][cat]['itemHash'] elif hawthorne_json['Response']['sales']['data'][cat]['itemHash'] in sotp_challenges: sotp_ch = hawthorne_json['Response']['sales']['data'][cat]['itemHash'] elif hawthorne_json['Response']['sales']['data'][cat]['itemHash'] in cos_challenges: cos_ch = hawthorne_json['Response']['sales']['data'][cat]['itemHash'] activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) i = 1 if str(r_json['hash']) in levi_order.keys() and \ not r_json['matchmaking']['requiresGuardianOath']: challenges = await self.get_modifiers(lang, item_hash) if challenges: challenge = set(challenges[0]['name'].lower().replace('"', '').split(' ')) challenge.discard('the') order_strings = levi_order[str(r_json['hash'])].splitlines() levi_str = '' for string in order_strings: intersection = challenge.intersection(set(string.lower().split(' '))) if intersection: levi_str = '{}{}\n'.format(levi_str, string) else: levi_str = '{}{}\n'.format(levi_str, string) levi_str = levi_str[:-1] else: levi_str = levi_order[str(r_json['hash'])] info = { 'name': r_json['originalDisplayProperties']['name'], 'description': levi_str } raids.append(info) if 'пожиратель миров: Престиж' in r_json['displayProperties']['name'] and \ not r_json['matchmaking']['requiresGuardianOath']: info = { 'inline': False, 'name': 'Пожиратель Миров и Звездный Шпиль', 'description': u"\u2063" } mods = await self.get_modifiers(lang, r_json['hash']) resp_time = datetime.utcnow().isoformat() if mods: loadout = '{}
\n{}\n
{}'.format(eow_loadouts[eow_loadout*3], eow_loadouts[eow_loadout*3+1], eow_loadouts[eow_loadout*3+2]) info['description'] = '{}: {}\n
\n{}:\n
{}'.format(mods[0]['name'], mods[0]['description'], mods[1]['name'], loadout) raids.append(info) if 'Последнее желание' in r_json['displayProperties']['name'] and \ not r_json['matchmaking']['requiresGuardianOath'] and lw_ch != 0: info = { 'name': r_json['originalDisplayProperties']['name'], 'description': u"\u2063" } curr_challenge = lw_ch curr_challenge = await self.destiny.decode_hash(curr_challenge, 'DestinyInventoryItemDefinition', language=lang) info['description'] = curr_challenge['displayProperties']['name'] raids.append(info) if 'Истребители прошлого' in r_json['displayProperties']['name'] and \ not r_json['matchmaking']['requiresGuardianOath'] and sotp_ch != 0: info = { 'name': r_json['originalDisplayProperties']['name'], 'description': u"\u2063" } curr_challenge = sotp_ch curr_challenge = await self.destiny.decode_hash(curr_challenge, 'DestinyInventoryItemDefinition', language=lang) info['description'] = curr_challenge['displayProperties']['name'] raids.append(info) if 'Корона скорби' in r_json['displayProperties']['name'] and \ not r_json['matchmaking']['requiresGuardianOath'] and cos_ch != 0: info = { 'name': r_json['originalDisplayProperties']['name'], 'descripition': u"\u2063" } curr_challenge = cos_ch curr_challenge = await self.destiny.decode_hash(curr_challenge, 'DestinyInventoryItemDefinition', language=lang) info['description'] = curr_challenge['displayProperties']['name'] raids.append(info) if 'Сад спасения' in r_json['displayProperties']['name'] and \ not r_json['matchmaking']['requiresGuardianOath']: info = { 'name': r_json['originalDisplayProperties']['name'], 'description': u"\u2063" } mods = await self.get_modifiers(lang, r_json['hash']) if mods: info['description'] = mods[0]['name'] raids.append(info) return { 'name': 'Рейды', 'size': size, 'items': raids, 'template': 'table_items.html' } async def get_ordeal(self, size='', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('ordeal', force=forceget) if not activities_resp: return {} for lang in langs: strikes = [] ordeal = [] activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if r_json['activityTypeHash'] == 4110605575: strikes.append({"name": r_json['displayProperties']['name'], "description": r_json['displayProperties']['description']}) if 'Сумрачный налет: Побоище' in r_json['displayProperties']['name'] and \ ': Мастер' in r_json['displayProperties']['name']: info = { 'name': r_json['originalDisplayProperties']['description'], 'description': u"\u2063" } ordeal.append(info) if info: for strike in strikes: if strike['name'] in info['name']: info['description'] = strike['description'] break return { 'name': 'Сумрачный налет: Побоище', 'size': size, 'items': ordeal, 'template': 'table_items.html' } async def get_nightmares(self, size='', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('nightmares', force=forceget) if not activities_resp: return {} for lang in langs: nightmares = [] activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if 'Охота на кошмаров:' in r_json['displayProperties']['name'] and \ ': Мастер' in r_json['displayProperties']['name']: info = { 'name': r_json['displayProperties']['name'].replace(': Мастер', "").replace('Охота на кошмаров: ', '').replace('\"', ''), 'description': r_json['displayProperties']['description'] } nightmares.append(info) return { 'name': 'Охоты на кошмаров', 'size': size, 'items': nightmares, 'template': 'table_items.html' } async def get_crucible_rotators(self, size='wide', langs=['ru'], forceget=False): activities_resp = await self.get_activities_response('cruciblerotators', string='crucible rotators', force=forceget) if not activities_resp: return {} rotators = [] for lang in langs: activities_json = activities_resp for key in activities_json['Response']['activities']['data']['availableActivities']: item_hash = key['activityHash'] definition = 'DestinyActivityDefinition' r_json = await self.destiny.decode_hash(item_hash, definition, language=lang) if r_json['destinationHash'] == 2777041980: if len(r_json['challenges']) > 0: obj_def = 'DestinyObjectiveDefinition' objective = await self.destiny.decode_hash(r_json['challenges'][0]['objectiveHash'], obj_def, lang) if 'Испытание из сменяемого плейлиста Горнила' in objective['displayProperties']['name'] or r_json['challenges'][0]['objectiveHash'] == 1607758693: if 'icon' in r_json['displayProperties']: icon = r_json['displayProperties']['icon'] else: icon = '' info = { 'icon': icon, "name": r_json['displayProperties']['name'], "description": r_json['displayProperties']['description'].replace('\n\n', '
') } if 'icon' in r_json['displayProperties']: info['icon'] = r_json['displayProperties']['icon'] else: info['icon'] = '/common/destiny2_content/icons/cc8e6eea2300a1e27832d52e9453a227.png' rotators.append(info) return { 'name': 'Сменяемые режимы горнила', 'size': size, 'items': rotators, 'template': 'table_items.html' } async def get_seasonal_eververse(self): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') data = [ { 'name': 'Популярные предметы за яркую пыль', 'items': [] }, { 'name': 'Предметы за яркую пыль', 'items': [] }, { 'name': 'Потребляемые предметы за яркую пыль', 'items': [] }, # { # 'name': 'Яркие энграммы', # 'items': [] # }, { 'name': 'Популярные предметы за серебро', 'items': [] }] lang = 'ru' n_order = 0 for i, item in enumerate(tess_def['itemList']): definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' is_interesting = False if item['displayCategoryIndex'] == 2 and item['itemHash'] not in [353932628, 3260482534, 3536420626, 3187955025, 2638689062]: is_interesting = True cat_number = 2 data_index = 0 elif item['displayCategoryIndex'] == 9 and item['itemHash'] not in [353932628, 3260482534, 3536420626, 3187955025, 2638689062]: is_interesting = True cat_number = 7 data_index = 1 elif item['displayCategoryIndex'] == 10 and item['itemHash'] not in [353932628, 3260482534, 3536420626, 3187955025, 2638689062]: is_interesting = True cat_number = 9 data_index = 2 elif item['displayCategoryIndex'] == 1 and item['itemHash'] != 827183327: is_interesting = True cat_number = 1 data_index = 3 if is_interesting: item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] data[data_index]['items'].append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }] }) n_order += 1 # engram_def = await self.destiny.decode_hash(tess_def['itemList'][-1]['itemHash'], 'DestinyInventoryItemDefinition', 'ru') # for category in engram_def['preview']['derivedItemCategories'][0]['items']: # cat_def = await self.destiny.decode_hash(category['itemHash'], 'DestinyInventoryItemDefinition', 'ru') # for cat_preview in cat_def['preview']['derivedItemCategories']: # for engram_item in cat_preview['items']: # item_def = await self.destiny.decode_hash(engram_item['itemHash'], definition, language=lang) # if 'screenshot' in item_def.keys(): # screenshot = 'Screenshot'.format(item_def['screenshot']) # else: # screenshot = '' # data[3]['items'].append({ # 'id': '{}_{}_{}'.format(engram_item['itemHash'], 1337, n_order), # 'icon': item_def['displayProperties']['icon'], # 'tooltip_id': '{}_{}_{}_tooltip'.format(engram_item['itemHash'], 1337, n_order), # 'hash': engram_item['itemHash'], # 'name': item_def['displayProperties']['name'], # 'screenshot': screenshot, # 'costs': [ # { # 'currency_icon': engram_def['displayProperties']['icon'], # 'cost': 1, # 'currency_name': engram_def['displayProperties']['name'] # }] # }) # n_order += 1 self.data_cursor.execute('''DROP TABLE season_ev''') self.data_cursor.execute('''CREATE TABLE "season_ev" ("items" TEXT)''') self.data_cursor.execute('''INSERT into season_ev VALUES (?)''', (str(data).replace('\"', '\\\"').replace('\'', '"'),)) self.data_db.commit() async def get_season_start(self): manifest_url = 'https://www.bungie.net/Platform/Destiny2/Manifest/' manifest_json = await self.get_bungie_json('default', manifest_url, {}, '') season_url = 'https://www.bungie.net{}'.format( manifest_json['Response']['jsonWorldComponentContentPaths']['en']['DestinySeasonDefinition']) season_json = await self.get_bungie_json('default', season_url, {}, '') for season in season_json: try: start = isoparse(season_json[season]['startDate']) end = isoparse(season_json[season]['endDate']) if start <= datetime.now(tz=timezone.utc) <= end: current_season = season return start except KeyError: pass return datetime.now(tz=timezone.utc) async def get_seasonal_featured_silver(self, langs, start): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') silver = [] classnames = ["охотник", "варлок", "титан"] for lang in langs: n_items = 0 curr_week = [] i_week = 1 class_items = 0 n_order = 0 for i, item in enumerate(tess_def['itemList']): if n_items >= 5 and n_items - class_items / 3 * 2 >= 5: i_week = i_week + 1 silver.append(list.copy(curr_week)) n_items = 0 curr_week = [] class_items = 0 if item['displayCategoryIndex'] == 1 and item['categoryIndex'] != 37: definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] cat_number = 4 if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' curr_week.append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }] }) n_order += 1 n_items = n_items + 1 if item_def['classType'] < 3 or any( class_name in item_def['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 return silver async def get_seasonal_featured_bd(self, langs, start): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') bd = [] classnames = ["охотник", "варлок", "титан"] for lang in langs: nweeks = 0 n_items = 0 curr_week = [] i_week = 1 class_items = 0 n_order = 0 for i, item in enumerate(tess_def['itemList']): # if n_items >= 5 and n_items - class_items / 3 * 2 >= 5: # i_week = i_week + 1 # bd.append(list.copy(curr_week)) # n_items = 0 # curr_week = [] # class_items = 0 if item['displayCategoryIndex'] == 9 and item['itemHash'] not in [353932628, 3260482534, 3536420626, 3187955025, 2638689062]: definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if 'item.ghost_hologram' in item_def['traitIds']: nweeks += 1 if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] cat_number = 4 if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' curr_week.append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }], 'classType': item_def['classType'], 'itemTypeDisplayName': item_def['itemTypeDisplayName'].lower() }) n_order += 1 n_items = n_items + 1 if item_def['classType'] < 3 or any( class_name in item_def['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 slots = [] curr_slot = [] n_items = 0 i_week = 0 class_items = 0 for item in curr_week: if n_items >= nweeks and n_items - class_items / 3 * 2 >= nweeks: i_week = i_week + 1 slots.append(list.copy(curr_slot)) n_items = 0 curr_slot = [] class_items = 0 if item['classType'] < 3 or any( class_name in item['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 curr_slot.append(item) n_items += 1 slots.append(list.copy(curr_slot)) indexes = [0] * len(slots) for i in range(0, nweeks): curr_week = [] for slot in slots: if slot[indexes[slots.index(slot)]]['classType'] < 3 or any(class_name in slot[indexes[slots.index(slot)]]['itemTypeDisplayName'].lower() for class_name in classnames): curr_week = [*curr_week, *slot[indexes[slots.index(slot)]:indexes[slots.index(slot)] + 3]] indexes[slots.index(slot)] += 3 else: curr_week.append(slot[indexes[slots.index(slot)]]) indexes[slots.index(slot)] += 1 bd.append(list.copy(curr_week)) return bd async def get_seasonal_shaders(self, langs, start): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') shaders = [] classnames = ["охотник", "варлок", "титан"] for lang in langs: n_items = 0 curr_week = [] i_week = 1 class_items = 0 n_order = 0 for i, item in enumerate(tess_def['itemList']): if n_items >= 4 and n_items - class_items / 3 * 2 >= 4: i_week = i_week + 1 shaders.append(list.copy(curr_week)) n_items = 0 curr_week = [] class_items = 0 if item['displayCategoryIndex'] == 10 and item['categoryIndex'] == 51: definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] cat_number = 4 if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' curr_week.append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }] }) n_order += 1 n_items = n_items + 1 if item_def['classType'] < 3 or any( class_name in item_def['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 return shaders async def get_seasonal_transmats(self, langs, start): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') transmats = [] classnames = ["охотник", "варлок", "титан"] for lang in langs: n_items = 0 curr_week = [] i_week = 1 class_items = 0 n_order = 0 for i, item in enumerate(tess_def['itemList']): if n_items >= 3 and n_items - class_items / 3 * 2 >= 3: i_week = i_week + 1 transmats.append(list.copy(curr_week)) n_items = 0 curr_week = [] class_items = 0 if item['displayCategoryIndex'] == 9 and item['categoryIndex'] == 52: definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] cat_number = 4 if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' curr_week.append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }] }) n_order += 1 n_items = n_items + 1 if item_def['classType'] < 3 or any( class_name in item_def['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 return transmats async def get_seasonal_bd(self, langs, start): tess_def = await self.destiny.decode_hash(3361454721, 'DestinyVendorDefinition') bd = [] classnames = ["охотник", "варлок", "титан"] for lang in langs: nweeks = 0 n_items = 0 curr_week = [] i_week = 1 class_items = 0 n_order = 0 for i, item in enumerate(tess_def['itemList']): # if n_items >= 5 and n_items - class_items/3*2 >= 5: # i_week = i_week + 1 # bd.append(list.copy(curr_week)) # n_items = 0 # curr_week = [] # class_items = 0 if item['displayCategoryIndex'] == 2 and item['itemHash'] not in [353932628, 3260482534, 3536420626, 3187955025, 2638689062]: definition = 'DestinyInventoryItemDefinition' item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) item_def = await self.destiny.decode_hash(item['itemHash'], definition, language=lang) if 'item.spawnfx' in item_def['traitIds']: nweeks += 1 if len(item['currencies']) > 0: currency_resp = await self.destiny.decode_hash(item['currencies'][0]['itemHash'], definition, language=lang) else: currency_resp = {'displayProperties': {'icon': '', 'name': ''}} item['currencies'] = [{'quantity': ''}] cat_number = 2 if 'screenshot' in item_def.keys(): screenshot = 'Screenshot'.format(item_def['screenshot']) else: screenshot = '' curr_week.append({ 'id': '{}_{}_{}'.format(item['itemHash'], cat_number, n_order), 'icon': item_def['displayProperties']['icon'], 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], cat_number, n_order), 'hash': item['itemHash'], 'name': item_def['displayProperties']['name'], 'screenshot': screenshot, 'costs': [ { 'currency_icon': currency_resp['displayProperties']['icon'], 'cost': item['currencies'][0]['quantity'], 'currency_name': currency_resp['displayProperties']['name'] }], 'classType': item_def['classType'], 'itemTypeDisplayName': item_def['itemTypeDisplayName'] }) n_order += 1 n_items = n_items + 1 if item_def['classType'] < 3 or any( class_name in item_def['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 slots = [] curr_slot = [] n_items = 0 i_week = 0 for item in curr_week: if n_items >= nweeks and n_items - class_items / 3 * 2 >= nweeks: i_week = i_week + 1 slots.append(list.copy(curr_slot)) n_items = 0 curr_slot = [] class_items = 0 if item['classType'] < 3 or any( class_name in item['itemTypeDisplayName'].lower() for class_name in classnames): class_items = class_items + 1 curr_slot.append(item) n_items += 1 slots.append(list.copy(curr_slot)) indexes = [0] * len(slots) for i in range(0, nweeks): curr_week = [] for slot in slots: if slot[indexes[slots.index(slot)]]['classType'] < 3 or any(class_name in slot[indexes[slots.index(slot)]]['itemTypeDisplayName'].lower() for class_name in classnames): curr_week = [*curr_week, *slot[indexes[slots.index(slot)]:indexes[slots.index(slot)] + 3]] indexes[slots.index(slot)] += 3 else: curr_week.append(slot[indexes[slots.index(slot)]]) indexes[slots.index(slot)] += 1 bd.append(list.copy(curr_week)) return bd async def get_weekly_eververse(self): langs = ['ru'] data = [] start = await self.get_season_start() bd = await self.get_seasonal_bd(langs, start) featured_bd = await self.get_seasonal_featured_bd(langs, start) # shaders = await self.get_seasonal_shaders(langs, start) # transmat = await self.get_seasonal_transmats(langs, start) # silver = await self.get_seasonal_featured_silver(langs, start) week_n = datetime.now(tz=timezone.utc) - await self.get_season_start() week_n = int(week_n.days / 7) - 14 for i in range(0, len(bd)): if week_n == i: week_str = 'Неделя {} (текущая)'.format(i + 1) else: week_str = 'Неделя {}'.format(i + 1) data.append({ 'name': week_str, 'items': [*bd[i]] }) if len(bd) == len(featured_bd): for i in range(0, len(bd)): data[i]['items'] = [*data[i]['items'], *featured_bd[i]] # if len(bd) == len(silver): # for i in range(0, len(bd)): # data[i]['items'] = [*data[i]['items'], *silver[i]] self.data_cursor.execute('''DROP TABLE evweekly''') self.data_cursor.execute('''CREATE TABLE "evweekly" ("items" TEXT)''') self.data_cursor.execute('''INSERT into evweekly VALUES (?)''', (str(data).replace('\"', '\\\"').replace('\'', '"'),)) self.data_db.commit() return { 'name': 'Эверверс', 'size': '', 'items': data[week_n]['items'], 'template': 'hover_items.html' } async def get_daily_rotations(self): rotations = [await self.get_spider(), await self.get_xur() # await self.get_strike_modifiers(), # await self.get_reckoning_modifiers(), # await self.get_heroic_story(size='tall'), # await self.get_forge() ] n_rotations = [] for rotation in rotations: if rotation: n_rotations.append(rotation) self.data_cursor.execute('''DROP TABLE dailyrotations''') self.data_cursor.execute('''CREATE TABLE "dailyrotations" ("items" TEXT)''') self.data_cursor.execute('''INSERT into dailyrotations VALUES (?)''', (str(n_rotations).replace('\"', '\\\"').replace('\'', '"'),)) self.data_db.commit() async def get_weekly_rotations(self): rotations = [#await self.get_nightfall820(), await self.get_raids(), await self.get_weekly_eververse(), await self.get_nightmares(), # await self.get_crucible_rotators(), await self.get_ordeal(), await self.get_drifter()] n_rotations = [] for rotation in rotations: if rotation: n_rotations.append(rotation) self.data_cursor.execute('''DROP TABLE weeklyrotations''') self.data_cursor.execute('''CREATE TABLE "weeklyrotations" ("items" TEXT)''') self.data_cursor.execute('''INSERT into weeklyrotations VALUES (?)''', (str(n_rotations).replace('\"', '\\\"').replace('\'', '"'),)) self.data_db.commit() async def decode_modifiers(self, key, lang): data = [] for mod_key in key['modifierHashes']: mod_def = 'DestinyActivityModifierDefinition' mod_json = await self.destiny.decode_hash(mod_key, mod_def, lang) mod = { "icon": mod_json['displayProperties']['icon'], "name": mod_json['displayProperties']['name'], "description": mod_json['displayProperties']['description'] } data.append(mod) return data async def get_activities_response(self, name, lang=None, string=None, force=False): char_info = self.char_info activities = [] hashes = set() for char in char_info['charid']: activities_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/Character/{}/'. \ format(char_info['platform'], char_info['membershipid'], char) activities_resp = await self.get_cached_json('activities_{}'.format(char), name, activities_url, self.activities_params, lang, string, force=force) if activities_resp: activities.append(activities_resp) activities_json = await self.get_cached_json('activities_{}'.format(char_info['charid'][-1]), name, activities_url, self.activities_params, lang, string, force=force) if activities_json: activities_json['Response']['activities']['data']['availableActivities'].clear() if len(activities) == 0: return False else: if len(activities) > 0: for char_activities in activities: for activity in char_activities['Response']['activities']['data']['availableActivities']: if activity['activityHash'] not in hashes: activities_json['Response']['activities']['data']['availableActivities'].append(activity) hashes.add(activity['activityHash']) return activities_json async def get_vendor_sales(self, lang, vendor_resp, cats, exceptions=[]): data_sales = [] vendor_json = vendor_resp tess_sales = vendor_json['Response']['sales']['data'] n_order = 0 for key in cats: item = tess_sales[str(key)] item_hash = item['itemHash'] if item_hash not in exceptions: definition = 'DestinyInventoryItemDefinition' item_resp = await self.destiny.decode_hash(item_hash, definition, language=lang) item_name_list = item_resp['displayProperties']['name'].split() item_name = ' '.join(item_name_list) costs = [] currency_cost = 'N/A' currency_item = '' if len(item['costs']) == 0: currency_cost = 'N/A' currency_item = '' costs.append({ 'currency_name': currency_item.capitalize(), 'cost': currency_cost, }) for cost in item['costs']: currency = cost currency_resp = await self.destiny.decode_hash(currency['itemHash'], definition, language=lang) currency_cost = str(currency['quantity']) currency_item = currency_resp['displayProperties']['name'] costs.append({ 'currency_name': currency_item.capitalize(), 'cost': currency_cost, 'currency_icon': currency_resp['displayProperties']['icon'] }) if 'screenshot' in item_resp.keys(): screenshot = 'Screenshot'.format(item_resp['screenshot']) else: screenshot = '' stats = [] if str(item['vendorItemIndex']) in vendor_json['Response']['itemComponents']['stats']['data'].keys(): stats_json = vendor_json['Response']['itemComponents']['stats']['data'][str(item['vendorItemIndex'])]['stats'] for stat in stats_json: value = stats_json[stat]['value'] if value == 0: continue stat_def = await self.destiny.decode_hash(stats_json[stat]['statHash'], 'DestinyStatDefinition', language=lang) stats.append({ 'name': stat_def['displayProperties']['name'], 'value': stats_json[stat]['value'] }) perks = [] if str(item['vendorItemIndex']) in vendor_json['Response']['itemComponents']['perks']['data'].keys(): try: plugs_json = vendor_json['Response']['itemComponents']['reusablePlugs']['data'][str(item['vendorItemIndex'])]['plugs'] plug_str = 'plugItemHash' except KeyError: plugs_json = vendor_json['Response']['itemComponents']['sockets']['data'][str(item['vendorItemIndex'])]['sockets'] plug_str = 'plugHash' plug = [] for perk in plugs_json: if type(perk) == str: perk_list = plugs_json[perk] elif type(perk) == dict: perk_list = [perk] else: raise TypeError for perk_dict in perk_list: if plug_str in perk_dict.keys(): perk_def = await self.destiny.decode_hash(perk_dict[plug_str], 'DestinyInventoryItemDefinition', language=lang) if 'name' in perk_def['displayProperties'].keys() and 'icon' in perk_def['displayProperties'].keys(): plug.append({ 'name': perk_def['displayProperties']['name'], 'icon': 'https://bungie.net{}'.format(perk_def['displayProperties']['icon']) }) perks.append(plug) item_data = { 'id': '{}_{}_{}'.format(item['itemHash'], key, n_order), 'icon': item_resp['displayProperties']['icon'], 'name': item_name.capitalize(), 'description': "{}: {} {}".format('Цена', currency_cost, currency_item.capitalize()), 'tooltip_id': '{}_{}_{}_tooltip'.format(item['itemHash'], key, n_order), 'hash': item['itemHash'], 'screenshot': screenshot, 'costs': costs, 'stats': stats, 'perks': perks } n_order += 1 data_sales.append(item_data) return data_sales async def get_bungie_json(self, name, url, params=None, lang=None, string=None, change_msg=True): session = aiohttp.ClientSession() if lang is None: lang_str = 'ru' else: lang_str = lang if string is None: string = str(name) try: resp = await session.get(url, params=params, headers=self.headers) except: await session.close() return False try: resp_code = await resp.json() resp_code = resp_code['ErrorCode'] except KeyError: resp_code = 1 except json.decoder.JSONDecodeError: await session.close() return False except aiohttp.ContentTypeError: await session.close() return False print('getting {} {}'.format(string, lang_str)) curr_try = 2 while resp_code in self.wait_codes and curr_try <= self.max_retries: print('{}, attempt {}'.format(resp_code, curr_try)) resp = await session.get(url, params=params, headers=self.headers) try: resp_code = await resp.json() resp_code = resp_code['ErrorCode'] except aiohttp.ContentTypeError: resp_code = 1672 if resp_code == 5: curr_try -= 1 curr_try += 1 time.sleep(5) if not resp: try: resp_code = await resp.json() except aiohttp.ContentTypeError: await session.close() return False resp_code = resp_code['ErrorCode'] if resp_code == 5: await session.close() return False print("{} get error".format(name), json.dumps(resp.json(), indent=4, sort_keys=True) + "\n") await session.close() return False else: try: resp_code = await resp.json() except aiohttp.ContentTypeError: await session.close() return False if 'ErrorCode' in resp_code.keys(): resp_code = resp_code['ErrorCode'] if resp_code == 5: await session.close() return False else: for suspected_season in resp_code: if 'seasonNumber' in resp_code[suspected_season].keys(): await session.close() return resp_code await session.close() return await resp.json() async def get_chars(self): session = aiohttp.ClientSession() platform = 0 membership_id = '' try: char_file = open('char.json', 'r') self.char_info = json.loads(char_file.read()) await session.close() except FileNotFoundError: membership_url = 'https://www.bungie.net/platform/User/GetMembershipsForCurrentUser/' search_resp = await session.get(url=membership_url, headers=self.headers) search_json = await search_resp.json() self.char_info['membershipid'] = search_json['Response']['primaryMembershipId'] membership_id = search_json['Response']['primaryMembershipId'] for membership in search_json['Response']['destinyMemberships']: if membership['membershipId'] == self.char_info['membershipid']: platform = membership['membershipType'] self.char_info['platform'] = platform char_search_url = 'https://www.bungie.net/platform/Destiny2/{}/Profile/{}/'.format(platform, membership_id) char_search_params = { 'components': '200' } char_search_resp = await session.get(char_search_url, params=char_search_params, headers=self.headers) char_search_json = await char_search_resp.json() chars = char_search_json['Response']['characters']['data'] char_ids = [] for key in sorted(chars.keys()): char_ids.append(chars[key]['characterId']) self.char_info['charid'] = char_ids char_file = open('char.json', 'w') char_file.write(json.dumps(self.char_info)) await session.close() async def refresh_token(self, re_token): session = aiohttp.ClientSession() headers = { 'Content-Type': 'application/x-www-form-urlencoded' } params = { 'grant_type': 'refresh_token', 'refresh_token': re_token, 'client_id': self.api_data['id'], 'client_secret': self.api_data['secret'] } r = await session.post('https://www.bungie.net/platform/app/oauth/token/', data=params, headers=headers) while not r: print("re_token get error", json.dumps(r.json(), indent=4, sort_keys=True) + "\n") r = await session.post('https://www.bungie.net/platform/app/oauth/token/', data=params, headers=headers) if not r: r_json = await r.json() if not r_json['error_description'] == 'DestinyThrottledByGameServer': break time.sleep(5) await session.close() if not r: r_json = await r.json() print("re_token get error", json.dumps(r_json, indent=4, sort_keys=True) + "\n") return resp = await r.json() try: token = { 'refresh': resp['refresh_token'], 'expires': time.time() + resp['refresh_expires_in'] } token_file = open('token.json', 'w') token_file.write(json.dumps(token)) self.headers = { 'X-API-Key': self.api_data['key'], 'Authorization': 'Bearer ' + resp['access_token'] } except KeyError: pass self.destiny = pydest.Pydest(self.api_data['key']) async def token_update(self): # check to see if token.json exists, if not we have to start with oauth try: f = open('token.json', 'r') except FileNotFoundError: self.oauth.get_oauth() try: f = open('token.json', 'r') self.token = json.loads(f.read()) except json.decoder.JSONDecodeError: self.oauth.get_oauth() # check if token has expired, if so we have to oauth, if not just refresh the token if self.token['expires'] < time.time(): self.oauth.get_oauth() else: await self.refresh_token(self.token['refresh']) async def get_cached_json(self, cache_id, name, url, params=None, lang=None, string=None, change_msg=True, force=False, cache_only=False): cache_cursor = self.cache_db.cursor() try: cache_cursor.execute('''SELECT json, expires, timestamp from cache WHERE id=?''', (cache_id,)) cached_entry = cache_cursor.fetchone() if cached_entry is not None: expired = datetime.now().timestamp() > cached_entry[1] else: expired = True except sqlite3.OperationalError: expired = True if cache_only: return False if (expired or force) and not cache_only: response = await self.get_bungie_json(name, url, params, lang, string, change_msg) timestamp = datetime.utcnow().isoformat() if response: response_json = response try: cache_cursor.execute( '''CREATE TABLE cache (id text, expires integer, json text, timestamp text);''') cache_cursor.execute('''CREATE UNIQUE INDEX cache_id ON cache(id)''') cache_cursor.execute('''INSERT OR IGNORE INTO cache VALUES (?,?,?,?)''', (cache_id, int(datetime.now().timestamp() + 1800), json.dumps(response_json), timestamp)) except sqlite3.OperationalError: try: cache_cursor.execute('''ALTER TABLE cache ADD COLUMN timestamp text''') cache_cursor.execute('''INSERT OR IGNORE INTO cache VALUES (?,?,?,?)''', (cache_id, int(datetime.now().timestamp() + 1800), json.dumps(response_json), timestamp)) except sqlite3.OperationalError: pass try: cache_cursor.execute('''INSERT OR IGNORE INTO cache VALUES (?,?,?,?)''', (cache_id, int(datetime.now().timestamp() + 1800), json.dumps(response_json), timestamp)) except sqlite3.OperationalError: pass try: cache_cursor.execute('''UPDATE cache SET expires=?, json=?, timestamp=? WHERE id=?''', (int(datetime.now().timestamp() + 1800), json.dumps(response_json), timestamp, cache_id)) except sqlite3.OperationalError: pass else: return False else: timestamp = cached_entry[2] response_json = json.loads(cached_entry[0]) self.cache_db.commit() response_json['timestamp'] = timestamp return response_json