cloud_storage_utility.platforms.ibm_cloud_storage
View Source
import base64 import hashlib import logging import time from typing import Dict import xmltodict from ..common.base_cloud_storage import BaseCloudStorage from ..types.bucket_key import BucketKeyMetadata from ..types.ibm_configuration import IbmConfiguration class IbmCloudStorage(BaseCloudStorage): """File operation implementations for IBM platform.""" def __init__(self, session, ibm_config: IbmConfiguration): super().__init__() self.__api_key = ibm_config.api_key self.__auth_endpoint = ibm_config.auth_endpoint self.__cos_endpoint = ibm_config.cos_endpoint self.__session = session self.__expires_at = -1 self.__access_token = "" async def get_bucket_keys( self, bucket_name: str, prefix: str = "" ) -> Dict[str, BucketKeyMetadata]: try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } params: Dict[str, str] = {} if prefix and prefix != "": params = {"prefix": prefix.strip(), **params} all_items = {} is_truncated = True continuation_token = None while is_truncated: if continuation_token: params = { "continuation-token": continuation_token, "prefix": prefix, } async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}?list-type=2", params=params, headers=headers, ) as response: xml_response = await response.text() response_dict = xmltodict.parse(xml_response)["ListBucketResult"] if "Contents" in response_dict: if "Key" in response_dict["Contents"]: item = response_dict["Contents"] items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } } else: items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } for item in response_dict["Contents"] } all_items.update(items) is_truncated = response_dict["IsTruncated"] == "true" if is_truncated: continuation_token = response_dict["NextContinuationToken"] except Exception as error: logging.exception(error) return {} return dict(all_items) async def upload_file( self, bucket_name, cloud_key, file_path, prefix="", callback=None, ) -> bool: """ Note: This should only be used for files < 500MB. When you need to upload larger files, you have to implement multi-part uploads. """ upload_succeeded = None try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } if prefix: cloud_key = f"{prefix}{cloud_key}" with open(file_path, "rb") as file_data: async with self.__session.put( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", data=file_data, headers=headers, ): pass upload_succeeded = True except Exception as error: logging.exception(error) upload_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=file_path, succeeded=upload_succeeded, ) return upload_succeeded async def remove_item(self, bucket_name, delete_request, callback=None) -> bool: removal_succeeded = True try: xml_body = xmltodict.unparse({"Delete": delete_request}) access_token = await self.__get_auth_token() md = hashlib.md5(xml_body.encode("utf-8")).digest() contents_md5 = base64.b64encode(md).decode("utf-8") headers = { "Authorization": f"Bearer {access_token}", "Content-MD5": contents_md5, "Content-Type": "text/plain; charset=utf-8", } async with self.__session.post( f"{self.__cos_endpoint}/{bucket_name}?delete=", headers=headers, data=xml_body, ) as response: dict_response = xmltodict.parse(await response.text())["DeleteResult"][ "Deleted" ] # This is just dealing with a quirk in the xml parser, if only s3 used json like a normal person :( if "Key" in dict_response: file_list = [dict_response["Key"]] else: file_list = [elem["Key"] for elem in dict_response] except Exception as error: logging.exception(error) removal_succeeded = False finally: if callback is not None: callback(bucket_name, file_list) return removal_succeeded # Overriding the parent function because we can make it more efficient def get_remove_items_coroutines(self, bucket_name, item_names, callback=None): # do nothing if len(item_names) == 0: return [] # the cloud function only allows up to 1000 keys per request, so we may # need many requests delete_requests = [] delete_tasks = [] try: request = [] for i, name in enumerate(item_names): request.append({"Key": name}) # every time the index is a mod of 1000, we know that's a # complete request if (i + 1) % 1000 == 0: delete_requests.append({"Object": request}) # reset request list for the next iteration request = [] # append whatever is left over if len(request) > 0: delete_requests.append({"Object": request}) except Exception as error: logging.exception(error) for request in delete_requests: delete_tasks.append(self.remove_item(bucket_name, request, callback)) return delete_tasks async def download_file( self, bucket_name, cloud_key, destination_filepath, prefix: str = "", callback=None, ) -> bool: download_succeeded = None try: access_token = await self.__get_auth_token() headers = {"Authorization": f"Bearer {access_token}"} async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", headers=headers, ) as response: with open(destination_filepath, "wb") as downloaded_file: downloaded_file.write(await response.read()) download_succeeded = True except Exception as error: logging.exception(error) download_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=destination_filepath, succeeded=download_succeeded, ) return download_succeeded async def __get_auth_token(self): current_time = int(time.time()) if current_time >= self.__expires_at: headers = { "content-type": "application/x-www-form-urlencoded", "accept": "application/json", } data = f"grant_type=urn%3Aibm%3Aparams%3Aoauth%3Agrant-type%3Aapikey&apikey={self.__api_key}" async with self.__session.post( self.__auth_endpoint, headers=headers, data=data ) as response: response = await response.json() self.__expires_at = response["expiration"] self.__access_token = response["access_token"] return self.__access_token
View Source
class IbmCloudStorage(BaseCloudStorage): """File operation implementations for IBM platform.""" def __init__(self, session, ibm_config: IbmConfiguration): super().__init__() self.__api_key = ibm_config.api_key self.__auth_endpoint = ibm_config.auth_endpoint self.__cos_endpoint = ibm_config.cos_endpoint self.__session = session self.__expires_at = -1 self.__access_token = "" async def get_bucket_keys( self, bucket_name: str, prefix: str = "" ) -> Dict[str, BucketKeyMetadata]: try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } params: Dict[str, str] = {} if prefix and prefix != "": params = {"prefix": prefix.strip(), **params} all_items = {} is_truncated = True continuation_token = None while is_truncated: if continuation_token: params = { "continuation-token": continuation_token, "prefix": prefix, } async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}?list-type=2", params=params, headers=headers, ) as response: xml_response = await response.text() response_dict = xmltodict.parse(xml_response)["ListBucketResult"] if "Contents" in response_dict: if "Key" in response_dict["Contents"]: item = response_dict["Contents"] items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } } else: items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } for item in response_dict["Contents"] } all_items.update(items) is_truncated = response_dict["IsTruncated"] == "true" if is_truncated: continuation_token = response_dict["NextContinuationToken"] except Exception as error: logging.exception(error) return {} return dict(all_items) async def upload_file( self, bucket_name, cloud_key, file_path, prefix="", callback=None, ) -> bool: """ Note: This should only be used for files < 500MB. When you need to upload larger files, you have to implement multi-part uploads. """ upload_succeeded = None try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } if prefix: cloud_key = f"{prefix}{cloud_key}" with open(file_path, "rb") as file_data: async with self.__session.put( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", data=file_data, headers=headers, ): pass upload_succeeded = True except Exception as error: logging.exception(error) upload_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=file_path, succeeded=upload_succeeded, ) return upload_succeeded async def remove_item(self, bucket_name, delete_request, callback=None) -> bool: removal_succeeded = True try: xml_body = xmltodict.unparse({"Delete": delete_request}) access_token = await self.__get_auth_token() md = hashlib.md5(xml_body.encode("utf-8")).digest() contents_md5 = base64.b64encode(md).decode("utf-8") headers = { "Authorization": f"Bearer {access_token}", "Content-MD5": contents_md5, "Content-Type": "text/plain; charset=utf-8", } async with self.__session.post( f"{self.__cos_endpoint}/{bucket_name}?delete=", headers=headers, data=xml_body, ) as response: dict_response = xmltodict.parse(await response.text())["DeleteResult"][ "Deleted" ] # This is just dealing with a quirk in the xml parser, if only s3 used json like a normal person :( if "Key" in dict_response: file_list = [dict_response["Key"]] else: file_list = [elem["Key"] for elem in dict_response] except Exception as error: logging.exception(error) removal_succeeded = False finally: if callback is not None: callback(bucket_name, file_list) return removal_succeeded # Overriding the parent function because we can make it more efficient def get_remove_items_coroutines(self, bucket_name, item_names, callback=None): # do nothing if len(item_names) == 0: return [] # the cloud function only allows up to 1000 keys per request, so we may # need many requests delete_requests = [] delete_tasks = [] try: request = [] for i, name in enumerate(item_names): request.append({"Key": name}) # every time the index is a mod of 1000, we know that's a # complete request if (i + 1) % 1000 == 0: delete_requests.append({"Object": request}) # reset request list for the next iteration request = [] # append whatever is left over if len(request) > 0: delete_requests.append({"Object": request}) except Exception as error: logging.exception(error) for request in delete_requests: delete_tasks.append(self.remove_item(bucket_name, request, callback)) return delete_tasks async def download_file( self, bucket_name, cloud_key, destination_filepath, prefix: str = "", callback=None, ) -> bool: download_succeeded = None try: access_token = await self.__get_auth_token() headers = {"Authorization": f"Bearer {access_token}"} async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", headers=headers, ) as response: with open(destination_filepath, "wb") as downloaded_file: downloaded_file.write(await response.read()) download_succeeded = True except Exception as error: logging.exception(error) download_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=destination_filepath, succeeded=download_succeeded, ) return download_succeeded async def __get_auth_token(self): current_time = int(time.time()) if current_time >= self.__expires_at: headers = { "content-type": "application/x-www-form-urlencoded", "accept": "application/json", } data = f"grant_type=urn%3Aibm%3Aparams%3Aoauth%3Agrant-type%3Aapikey&apikey={self.__api_key}" async with self.__session.post( self.__auth_endpoint, headers=headers, data=data ) as response: response = await response.json() self.__expires_at = response["expiration"] self.__access_token = response["access_token"] return self.__access_token
File operation implementations for IBM platform.
View Source
def __init__(self, session, ibm_config: IbmConfiguration): super().__init__() self.__api_key = ibm_config.api_key self.__auth_endpoint = ibm_config.auth_endpoint self.__cos_endpoint = ibm_config.cos_endpoint self.__session = session self.__expires_at = -1 self.__access_token = ""
Sets up platform independent configurations, and operations.
Args
- part_size (int, optional): The size of the chunks (how to divide up large files). Defaults to 5MB.
- file_threshold (int, optional): How large a file needs to be before performing operations in chunks. Defaults to 15MB.
View Source
async def get_bucket_keys( self, bucket_name: str, prefix: str = "" ) -> Dict[str, BucketKeyMetadata]: try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } params: Dict[str, str] = {} if prefix and prefix != "": params = {"prefix": prefix.strip(), **params} all_items = {} is_truncated = True continuation_token = None while is_truncated: if continuation_token: params = { "continuation-token": continuation_token, "prefix": prefix, } async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}?list-type=2", params=params, headers=headers, ) as response: xml_response = await response.text() response_dict = xmltodict.parse(xml_response)["ListBucketResult"] if "Contents" in response_dict: if "Key" in response_dict["Contents"]: item = response_dict["Contents"] items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } } else: items = { item["Key"]: { BucketKeyMetadata( last_modified=item["LastModified"], bytes=item["Size"], ) } for item in response_dict["Contents"] } all_items.update(items) is_truncated = response_dict["IsTruncated"] == "true" if is_truncated: continuation_token = response_dict["NextContinuationToken"] except Exception as error: logging.exception(error) return {} return dict(all_items)
An implementation of this must provide a way to list the contents of a bucket.
Args
- bucket_name (str): Target bucket.
- prefix (str, optional): Only get keys that match this prefix.
Returns
Dict[str, BucketKeyMetadata]: Dictionary of key name -> KeyMetadata, i.e.
{ "image.jpeg": { "bytes": 32, "last_modified": 1619195172 }, "file.txt": { "bytes": 32, "last_modified": 1619195172 } }
View Source
async def upload_file( self, bucket_name, cloud_key, file_path, prefix="", callback=None, ) -> bool: """ Note: This should only be used for files < 500MB. When you need to upload larger files, you have to implement multi-part uploads. """ upload_succeeded = None try: access_token = await self.__get_auth_token() headers = { "Authorization": f"Bearer {access_token}", } if prefix: cloud_key = f"{prefix}{cloud_key}" with open(file_path, "rb") as file_data: async with self.__session.put( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", data=file_data, headers=headers, ): pass upload_succeeded = True except Exception as error: logging.exception(error) upload_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=file_path, succeeded=upload_succeeded, ) return upload_succeeded
Note: This should only be used for files < 500MB. When you need to upload larger files, you have to implement multi-part uploads.
View Source
async def remove_item(self, bucket_name, delete_request, callback=None) -> bool: removal_succeeded = True try: xml_body = xmltodict.unparse({"Delete": delete_request}) access_token = await self.__get_auth_token() md = hashlib.md5(xml_body.encode("utf-8")).digest() contents_md5 = base64.b64encode(md).decode("utf-8") headers = { "Authorization": f"Bearer {access_token}", "Content-MD5": contents_md5, "Content-Type": "text/plain; charset=utf-8", } async with self.__session.post( f"{self.__cos_endpoint}/{bucket_name}?delete=", headers=headers, data=xml_body, ) as response: dict_response = xmltodict.parse(await response.text())["DeleteResult"][ "Deleted" ] # This is just dealing with a quirk in the xml parser, if only s3 used json like a normal person :( if "Key" in dict_response: file_list = [dict_response["Key"]] else: file_list = [elem["Key"] for elem in dict_response] except Exception as error: logging.exception(error) removal_succeeded = False finally: if callback is not None: callback(bucket_name, file_list) return removal_succeeded
An implementation for this must provide a way to send removal requests.
Args
- bucket_name (str): Target bucket.
- cloud_key (str): The name of the key we want to remove.
- callback (Callable[[str, str, str], None], optional): Implementations of this method need to call this after the operation is complete. Defaults to None.
Returns
bool: Whether the remove was successful or not.
View Source
def get_remove_items_coroutines(self, bucket_name, item_names, callback=None): # do nothing if len(item_names) == 0: return [] # the cloud function only allows up to 1000 keys per request, so we may # need many requests delete_requests = [] delete_tasks = [] try: request = [] for i, name in enumerate(item_names): request.append({"Key": name}) # every time the index is a mod of 1000, we know that's a # complete request if (i + 1) % 1000 == 0: delete_requests.append({"Object": request}) # reset request list for the next iteration request = [] # append whatever is left over if len(request) > 0: delete_requests.append({"Object": request}) except Exception as error: logging.exception(error) for request in delete_requests: delete_tasks.append(self.remove_item(bucket_name, request, callback)) return delete_tasks
Get a list of all the coroutines needed to perform the requested removal.
Args
- bucket_name (str): Target bucket.
- item_names (List[str]): Items to remove from the bucket.
- callback (Callable[[str, str, str], None], optional): Passes the callback into each coroutine. Defaults to None.
Returns
List[Coroutine[Any, Any, None]]: List of coroutines which will fulfill the operation.
View Source
async def download_file( self, bucket_name, cloud_key, destination_filepath, prefix: str = "", callback=None, ) -> bool: download_succeeded = None try: access_token = await self.__get_auth_token() headers = {"Authorization": f"Bearer {access_token}"} async with self.__session.get( f"{self.__cos_endpoint}/{bucket_name}/{cloud_key}", headers=headers, ) as response: with open(destination_filepath, "wb") as downloaded_file: downloaded_file.write(await response.read()) download_succeeded = True except Exception as error: logging.exception(error) download_succeeded = False finally: if callback is not None: callback( bucket_name=bucket_name, cloud_key=cloud_key, file_path=destination_filepath, succeeded=download_succeeded, ) return download_succeeded
An implementation for this must provide a way to download a single file.
Args
- bucket_name (str): Target bucket.
- cloud_key (str): The name of the item we want to download from the cloud bucket.
- destination_filepath (str): Where to put the downloaded item.
- prefix (str, optional): Only download files under the matching prefix.
- callback (Callable[[str, str, str, bool], None], optional): Implementations of this method need to call this after the operation is complete. Defaults to None.
Returns
bool: Whether the download was successful or not.