Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload e Download de ativos digitais grandes #187

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 115 additions & 57 deletions opac_ssm_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import grpc
import json
import logging
import tempfile
from imp import reload

from grpc_health.v1 import health_pb2
Expand All @@ -21,8 +22,8 @@
PROTO_PATH = os.getenv('OPAC_SSM_PROTO_FILE_PATH', '/scieloorg/opac_ssm/master/grpc_ssm/opac.proto')
PROTO_UPDATE = os.getenv('OPAC_SSM_PROTO_UPDATE', 'False') == 'True'

MAX_RECEIVE_MESSAGE_LENGTH = int(os.getenv('MAX_RECEIVE_MESSAGE_LENGTH', 90 * 1024 * 1024)) # 90MB
MAX_SEND_MESSAGE_LENGTH = int(os.getenv('MAX_SEND_MESSAGE_LENGTH', 90 * 1024 * 1024)) # 90MB
MAX_RECEIVE_MESSAGE_LENGTH = int(os.getenv('MAX_RECEIVE_MESSAGE_LENGTH', 5 * 1024 * 1024)) # 5MB
MAX_SEND_MESSAGE_LENGTH = int(os.getenv('MAX_SEND_MESSAGE_LENGTH', 5 * 1024 * 1024)) # 5MB

try:
from opac_ssm_api import opac_pb2_grpc, opac_pb2
Expand All @@ -32,6 +33,73 @@
from opac_ssm_api import opac_pb2_grpc, opac_pb2


CHUNK_SIZE = 1024 * 1024 # 1MB
MAX_UPLOADED_FILE_SIZE = 2 * CHUNK_SIZE


def get_file_chunks(filename):
with open(filename, 'rb') as f:
while True:
piece = f.read(CHUNK_SIZE)
if len(piece) == 0:
return
yield opac_pb2.File(buffer=piece)


def save_chunks_to_file(chunks, filename):
with open(filename, 'wb') as f:
for chunk in chunks:
f.write(chunk.buffer)


def save_tmpfile(content, file_path=None):
if file_path is None:
tmphandle, file_path = tempfile.mkstemp()
with open(file_path, 'wb') as f:
f.write(content)
return file_path


def delete_temp_file(file_path):
try:
os.unlink(file_path)
except OSError:
logger.info('%s' % file_path)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info('%s' % file_path)
logger.error('%s' % file_path)



def get_file_path(content_p, filename):
"""
content_p pode ser filepointer, file_path, StringIO ou similares
cria temp_file, cujo nome é filename, se content_p não é um arquivo ou
não é um file pointer
Retorna tupla: (path do arquivo, is_temp_file)
Retorna IOError em outros casos
"""
if os.path.isfile(content_p):
if not os.access(content_p, os.R_OK):
error_msg = '{} is not a readable file'.format(content_p)
logger.error(error_msg, content_p)
raise IOError(error_msg)
return content_p, False

elif hasattr(content_p, 'read'):
if hasattr(content_p, 'name'):
return content_p.name, False

if not filename:
error_msg = 'Param "filename" is required'
logger.error(error_msg)
raise IOError(error_msg)

with open(filename, 'wb') as f:
f.write(content_p.read())
return filename, True

error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, content_p)
raise IOError(error_msg)


class Client(object):

def __init__(self, host=HOST_NAME, port=HOST_PORT, proto_http_port=HTTP_PROTO_PORT,
Expand Down Expand Up @@ -105,38 +173,34 @@ def add_asset(self, pfile, filename='', filetype='', metadata='',
Raise ValueError if not set filename when pfile is a file pointer
Raise IOError if pfile is not a file or cant read the file
"""
if not metadata:
metadata = {}
elif not isinstance(metadata, dict):
error_msg = 'Param "metadata" must be a Dict or None.'
logger.error(error_msg)
raise ValueError(error_msg)

if hasattr(pfile, 'read'):
if not filename:
error_msg = 'Param "filename" is required'
logger.error(error_msg)
raise ValueError(error_msg)
else:
filename = filename
file_content = pfile.read()
else:
if os.path.isfile(pfile) and os.access(pfile, os.R_OK):
with open(pfile, 'rb') as fp:
filename = os.path.basename(getattr(fp, 'name', None))
file_content = fp.read()
else:
error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, pfile)
raise IOError(error_msg)
file_path, is_temp_file = get_file_path(pfile, filename)
filename = os.path.basename(file_path)

if os.path.getsize(file_path) > MAX_UPLOADED_FILE_SIZE:
chunks_generator = get_file_chunks(file_path)
large_file_path = self.stubAsset.upload(chunks_generator)
asset = opac_pb2.Asset(
file='',
filename=filename,
type=filetype,
metadata=json.dumps(metadata),
bucket=bucket_name,
large_file_path=large_file_path
)
if is_temp_file:
delete_temp_file(file_path)
return self.stubAsset.add_asset(asset).id

asset = opac_pb2.Asset(
file=file_content,
file=open(file_path).read(),
filename=filename,
type=filetype,
metadata=json.dumps(metadata),
bucket=bucket_name
bucket=bucket_name,
large_file_path=''
)
if is_temp_file:
delete_temp_file(file_path)

return self.stubAsset.add_asset(asset).id

Expand All @@ -159,6 +223,13 @@ def get_asset(self, _id):
raise ValueError(msg)
try:
asset = self.stubAsset.get_asset(opac_pb2.TaskId(id=_id))
if asset.large_file_path:
out_file_name = save_tmpfile(asset.file)
response = self.stubAsset.download(
opac_pb2.Request(large_file_path=asset.large_file_path))
save_chunks_to_file(response, out_file_name)
asset.file = open(out_file_name, 'rb').read()
delete_temp_file(out_file_name)
except Exception as e:
logger.error(e)
return (False, {'error_message': e.details()})
Expand Down Expand Up @@ -314,8 +385,8 @@ def get_task_state(self, _id):

return task_state.state

def update_asset(self, uuid, pfile=None, filename=None, filetype=None, metadata=None,
bucket_name=None):
def update_asset(self, uuid, pfile=None, filename=None, filetype=None,
metadata=None, bucket_name=None):
"""
Update asset to SSM.

Expand All @@ -331,45 +402,32 @@ def update_asset(self, uuid, pfile=None, filename=None, filetype=None, metadata=

Raise ValueError if param uuid is not a str|unicode
"""

if not isinstance(uuid, six.string_types):
raise ValueError('Param "uuid" must be a str|unicode.')

update_params = {}

if self.stubAsset.exists_asset(opac_pb2_grpc.TaskId(id=uuid)):

update_params = {}
update_params['uuid'] = uuid

if not metadata:
update_params['metadata'] = json.dumps({})
elif not isinstance(metadata, dict):
metadata = metadata or {}
if not isinstance(metadata, dict):
error_msg = 'Param "metadata" must be a Dict or None.'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
error_msg = 'Param "metadata" must be a Dict or None.'
error_msg = 'Param "metadata" must be a Dict.'

logger.exception(error_msg)
raise ValueError(error_msg)
else:
update_params['metadata'] = json.dumps(metadata)
update_params['metadata'] = json.dumps(metadata)

if pfile is not None:
if hasattr(pfile, 'read'):
if not filename:
error_msg = 'Param "filename" is required'
logger.exception(error_msg)
raise IOError(error_msg)
else:
filename = filename
file_content = pfile.read()
file_path, is_temp_file = get_file_path(pfile, filename)
filename = os.path.basename(file_path)

if os.path.getsize(file_path) > MAX_UPLOADED_FILE_SIZE:
chunks_generator = get_file_chunks(file_path)
update_params['large_file_path'] = self.stubAsset.upload(chunks_generator)
update_params['file'] = b''
else:
if os.path.isfile(pfile) and os.access(pfile, os.R_OK):
with open(pfile, 'rb') as fp:
filename = os.path.basename(getattr(fp, 'name', None))
file_content = fp.read()
else:
error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, pfile)
raise IOError(error_msg)

update_params['file'] = file_content
update_params['file'] = open(file_path, 'rb').read()
if is_temp_file:
delete_temp_file(file_path)
update_params['filename'] = filename

if filetype:
Expand Down