Skip to content

Commit

Permalink
Upload e Download de ativos digitais grandes
Browse files Browse the repository at this point in the history
Alteração em add_asset e update_asset
Alteração em get_asset
se o arquivo for grande:
- a execução de add_asset e update_asset em duas etapas: primeiramente executando opac_ssm.upload_file que retorna o nome do arquivo grande e depois executando opac_ssm.add_asset incluindo asset.large_file_path.
- a execução de get_asset também é em duas etapas: primeiramente executando opac_ssm.get_asset, depois executando opac_ssm.download_file, usando o arquivo identificado como asset.large_file_path.

Relacionado com
scieloorg/opac_ssm#390

tk185

Fixes scieloorg#185
  • Loading branch information
robertatakenaka committed Oct 22, 2018
1 parent 002f74b commit 8d4a9d8
Showing 1 changed file with 115 additions and 57 deletions.
172 changes: 115 additions & 57 deletions opac_ssm_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import grpc
import json
import logging
import tempfile
from imp import reload

from grpc_health.v1 import health_pb2
Expand All @@ -21,8 +22,8 @@
PROTO_PATH = os.getenv('OPAC_SSM_PROTO_FILE_PATH', '/scieloorg/opac_ssm/master/grpc_ssm/opac.proto')
PROTO_UPDATE = os.getenv('OPAC_SSM_PROTO_UPDATE', 'False') == 'True'

MAX_RECEIVE_MESSAGE_LENGTH = int(os.getenv('MAX_RECEIVE_MESSAGE_LENGTH', 90 * 1024 * 1024)) # 90MB
MAX_SEND_MESSAGE_LENGTH = int(os.getenv('MAX_SEND_MESSAGE_LENGTH', 90 * 1024 * 1024)) # 90MB
MAX_RECEIVE_MESSAGE_LENGTH = int(os.getenv('MAX_RECEIVE_MESSAGE_LENGTH', 5 * 1024 * 1024)) # 5MB
MAX_SEND_MESSAGE_LENGTH = int(os.getenv('MAX_SEND_MESSAGE_LENGTH', 5 * 1024 * 1024)) # 5MB

try:
from opac_ssm_api import opac_pb2_grpc, opac_pb2
Expand All @@ -32,6 +33,73 @@
from opac_ssm_api import opac_pb2_grpc, opac_pb2


CHUNK_SIZE = 1024 * 1024 # 1MB
MAX_UPLOADED_FILE_SIZE = 2 * CHUNK_SIZE


def get_file_chunks(filename):
with open(filename, 'rb') as f:
while True:
piece = f.read(CHUNK_SIZE)
if len(piece) == 0:
return
yield opac_pb2.File(buffer=piece)


def save_chunks_to_file(chunks, filename):
with open(filename, 'wb') as f:
for chunk in chunks:
f.write(chunk.buffer)


def save_tmpfile(content, file_path=None):
if file_path is None:
tmphandle, file_path = tempfile.mkstemp()
with open(file_path, 'wb') as f:
f.write(content)
return file_path


def delete_temp_file(file_path):
try:
os.unlink(file_path)
except OSError:
logger.info('%s' % file_path)


def get_file_path(content_p, filename):
"""
content_p pode ser filepointer, file_path, StringIO ou similares
cria temp_file, cujo nome é filename, se content_p não é um arquivo ou
não é um file pointer
Retorna tupla: (path do arquivo, is_temp_file)
Retorna IOError em outros casos
"""
if os.path.isfile(content_p):
if not os.access(content_p, os.R_OK):
error_msg = '{} is not a readable file'.format(content_p)
logger.error(error_msg, content_p)
raise IOError(error_msg)
return content_p, False

elif hasattr(content_p, 'read'):
if hasattr(content_p, 'name'):
return content_p.name, False

if not filename:
error_msg = 'Param "filename" is required'
logger.error(error_msg)
raise IOError(error_msg)

with open(filename, 'wb') as f:
f.write(content_p.read())
return filename, True

error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, content_p)
raise IOError(error_msg)


class Client(object):

def __init__(self, host=HOST_NAME, port=HOST_PORT, proto_http_port=HTTP_PROTO_PORT,
Expand Down Expand Up @@ -105,38 +173,34 @@ def add_asset(self, pfile, filename='', filetype='', metadata='',
Raise ValueError if not set filename when pfile is a file pointer
Raise IOError if pfile is not a file or cant read the file
"""
if not metadata:
metadata = {}
elif not isinstance(metadata, dict):
error_msg = 'Param "metadata" must be a Dict or None.'
logger.error(error_msg)
raise ValueError(error_msg)

if hasattr(pfile, 'read'):
if not filename:
error_msg = 'Param "filename" is required'
logger.error(error_msg)
raise ValueError(error_msg)
else:
filename = filename
file_content = pfile.read()
else:
if os.path.isfile(pfile) and os.access(pfile, os.R_OK):
with open(pfile, 'rb') as fp:
filename = os.path.basename(getattr(fp, 'name', None))
file_content = fp.read()
else:
error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, pfile)
raise IOError(error_msg)
file_path, is_temp_file = get_file_path(pfile, filename)
filename = os.path.basename(file_path)

if os.path.getsize(file_path) > MAX_UPLOADED_FILE_SIZE:
chunks_generator = get_file_chunks(file_path)
large_file_path = self.stubAsset.upload(chunks_generator)
asset = opac_pb2.Asset(
file='',
filename=filename,
type=filetype,
metadata=json.dumps(metadata),
bucket=bucket_name,
large_file_path=large_file_path
)
if is_temp_file:
delete_temp_file(file_path)
return self.stubAsset.add_asset(asset).id

asset = opac_pb2.Asset(
file=file_content,
file=open(file_path).read(),
filename=filename,
type=filetype,
metadata=json.dumps(metadata),
bucket=bucket_name
bucket=bucket_name,
large_file_path=''
)
if is_temp_file:
delete_temp_file(file_path)

return self.stubAsset.add_asset(asset).id

Expand All @@ -159,6 +223,13 @@ def get_asset(self, _id):
raise ValueError(msg)
try:
asset = self.stubAsset.get_asset(opac_pb2.TaskId(id=_id))
if asset.large_file_path:
out_file_name = save_tmpfile(asset.file)
response = self.stubAsset.download(
opac_pb2.Request(large_file_path=asset.large_file_path))
save_chunks_to_file(response, out_file_name)
asset.file = open(out_file_name, 'rb').read()
delete_temp_file(out_file_name)
except Exception as e:
logger.error(e)
return (False, {'error_message': e.details()})
Expand Down Expand Up @@ -314,8 +385,8 @@ def get_task_state(self, _id):

return task_state.state

def update_asset(self, uuid, pfile=None, filename=None, filetype=None, metadata=None,
bucket_name=None):
def update_asset(self, uuid, pfile=None, filename=None, filetype=None,
metadata=None, bucket_name=None):
"""
Update asset to SSM.
Expand All @@ -331,45 +402,32 @@ def update_asset(self, uuid, pfile=None, filename=None, filetype=None, metadata=
Raise ValueError if param uuid is not a str|unicode
"""

if not isinstance(uuid, six.string_types):
raise ValueError('Param "uuid" must be a str|unicode.')

update_params = {}

if self.stubAsset.exists_asset(opac_pb2_grpc.TaskId(id=uuid)):

update_params = {}
update_params['uuid'] = uuid

if not metadata:
update_params['metadata'] = json.dumps({})
elif not isinstance(metadata, dict):
metadata = metadata or {}
if not isinstance(metadata, dict):
error_msg = 'Param "metadata" must be a Dict or None.'
logger.exception(error_msg)
raise ValueError(error_msg)
else:
update_params['metadata'] = json.dumps(metadata)
update_params['metadata'] = json.dumps(metadata)

if pfile is not None:
if hasattr(pfile, 'read'):
if not filename:
error_msg = 'Param "filename" is required'
logger.exception(error_msg)
raise IOError(error_msg)
else:
filename = filename
file_content = pfile.read()
file_path, is_temp_file = get_file_path(pfile, filename)
filename = os.path.basename(file_path)

if os.path.getsize(file_path) > MAX_UPLOADED_FILE_SIZE:
chunks_generator = get_file_chunks(file_path)
update_params['large_file_path'] = self.stubAsset.upload(chunks_generator)
update_params['file'] = b''
else:
if os.path.isfile(pfile) and os.access(pfile, os.R_OK):
with open(pfile, 'rb') as fp:
filename = os.path.basename(getattr(fp, 'name', None))
file_content = fp.read()
else:
error_msg = "The file pointed: (%s) is not a file or is unreadable."
logger.error(error_msg, pfile)
raise IOError(error_msg)

update_params['file'] = file_content
update_params['file'] = open(file_path, 'rb').read()
if is_temp_file:
delete_temp_file(file_path)
update_params['filename'] = filename

if filetype:
Expand Down

0 comments on commit 8d4a9d8

Please sign in to comment.