# Copyright (c) 2018 The Ontario Institute for Cancer Research. All rights
# reserved.
#
# This program and the accompanying materials are made available under the
# terms of the GNU Public License v3.0. You should have received a copy of
# the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING,BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import os
import re
import time
from enum import unique, Enum
import logging
from overture_song.client import Api, StudyClient
from overture_song.utils import SongClientException, check_file, check_dir, check_song_state
from overture_song.model import ApiConfig, SongError
from overture_song.entities import *
import json
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("song.tools")
[docs]class BatchUploader(object):
def __init__(self, server_url, access_token, payload_dir, debug=False):
# Dependencies
self.payload_dir = os.path.realpath(payload_dir)
# Config
self.server_url = server_url
self.debug = debug
self.access_token = access_token
self.is_async_validation = True
self.ignore_analysis_id_collisions = True
# State
self.__upload_status_map = {}
self.__api = None
# Check
check_dir(self.payload_dir)
def __calc_depth(self, root_path):
rel_root_path = os.path.realpath(root_path).replace(self.payload_dir, '')
return len(rel_root_path.split(os.sep)) - 1
def __use_root_dir(self, root_dir):
is_study_level = self.__calc_depth(root_dir) == 1
study_id_candidate = root_dir.split(os.sep)[-1]
is_not_hidden_dir = re.search('^\.', study_id_candidate) is None
return is_not_hidden_dir and is_study_level, study_id_candidate
def __build_api_config(self, study_id):
return ApiConfig(self.server_url, study_id, self.access_token, debug=self.debug)
def __build_api(self, study_id):
return Api(self.__build_api_config(study_id))
def __build_study_client(self, study_id):
return StudyClient(self.__build_api(study_id))
def __setup_study(self, study_id):
study_client = self.__build_study_client(study_id)
if not study_client.has(study_id):
study = Study.create(study_id, name="N/A", organization="ICGC", description="None")
study_client.create(study)
[docs] def upload_all(self):
for root, dirs, files in os.walk(self.payload_dir):
use_root_dir, study_id = self.__use_root_dir(root)
if use_root_dir:
print("study_id = {} files= {} ".format(study_id, files))
self.__setup_study(study_id)
filtered_file_list = list(filter(lambda f: f.endswith('.json'), files))
total_size = len(filtered_file_list)
file_count = 0
for file in filtered_file_list:
filename = root+os.sep+file
file_upload_obj = self.get_file(study_id, filename)
file_upload_obj.upload()
file_count += 1
print("Uploaded ( {} / {} ) [{}]: {}".format(file_count, total_size, study_id, filename))
[docs] def status_all(self):
for root, dirs, files in os.walk(self.payload_dir):
use_root_dir, study_id = self.__use_root_dir(root)
if use_root_dir:
filtered_file_list = list(filter(lambda f: f.endswith('.json'), files))
total_size = len(filtered_file_list)
file_count = 0
# TODO: refactor this repetition out
for file in filtered_file_list:
filename = root+os.sep+file
file_upload_obj = self.get_file(study_id, filename)
file_upload_obj.update_status()
file_count += 1
print("Status Checked ( {} / {} ) [{}]: {}".format(file_count, total_size, study_id, filename))
[docs] def save_all(self):
for root, dirs, files in os.walk(self.payload_dir):
use_root_dir, study_id = self.__use_root_dir(root)
if use_root_dir:
filtered_file_list = list(filter(lambda f: f.endswith('.json'), files))
total_size = len(filtered_file_list)
file_count = 0
for file in filtered_file_list:
filename = root+os.sep+file
file_upload_obj = self.get_file(study_id, filename)
file_upload_obj.save()
file_count += 1
print("Saved ( {} / {} ) [{}]: {}".format(file_count, total_size, study_id, filename))
[docs] def publish_all(self):
for root, dirs, files in os.walk(self.payload_dir):
use_root_dir, study_id = self.__use_root_dir(root)
if use_root_dir:
filtered_file_list = list(filter(lambda f: f.endswith('.json'), files))
total_size = len(filtered_file_list)
file_count = 0
for file in filtered_file_list:
filename = root+os.sep+file
file_upload_obj = self.get_file(study_id, filename)
file_upload_obj.publish()
file_count += 1
print("Published ( {} / {} ) [{}]: {}".format(file_count, total_size, study_id, filename))
[docs] def get_file(self, study_id, filename):
if study_id in self.__upload_status_map:
if filename in self.__upload_status_map[study_id]:
return self.__upload_status_map[study_id][filename]
else:
self.__upload_status_map[study_id] = {}
api = self.__build_api(study_id)
file_upload = FileUploadClient(
api, filename,
is_async_validation=self.is_async_validation,
ignore_analysis_id_collisions=self.ignore_analysis_id_collisions)
self.__upload_status_map[study_id][filename] = file_upload
return file_upload
[docs] def get_studies(self):
return self.__upload_status_map.keys()
[docs] def get_files(self, study_id):
if study_id in self.__upload_status_map:
return list(self.__upload_status_map[study_id].values())
return []
[docs] def get_all_files(self):
out = []
for study_id in self.get_studies():
out = out + self.get_files(study_id)
return out
[docs] def print_upload_states(self):
for file_upload in self.get_all_files():
print("{}\t{}\t{}".format(file_upload.study_id, file_upload.upload_state, file_upload.filename))
[docs]@unique
class FileUploadState(Enum):
UPLOAD_ERROR = -6
STATUS_ERROR = -5
VALIDATION_ERROR = -4
SAVE_ERROR = -3
PUBLISH_ERROR = -2
UNKNOWN_ERROR = -1
NOT_UPLOADED = 0
SUBMITTED = 1
VALIDATED = 2
SAVED = 3
PUBLISHED = 4
def __equals(self, other):
return self.value == other.value
def __lt__(self, other):
check_type(other, FileUploadState)
return self.value < other.value
def __gt__(self, other):
return other.__lt__(self)
def __eq__(self, other):
check_type(other, FileUploadState)
return self.__equals(other)
def __le__(self, other):
return self.__lt__(other) or self.__equals(other)
def __ge__(self, other):
return other.__le__(self)
def __str__(self):
return super(FileUploadState, self).__str__().split('.')[1]
[docs]class FileUploadClient(object):
def __init__(self, api, filename, is_async_validation=False, ignore_analysis_id_collisions=False):
# Dependencies
self.__api = api
self.filename = filename
self.study_id = api.config.study_id
# Config
self.is_async_validation = is_async_validation
self.ignore_analysis_id_collisions = ignore_analysis_id_collisions
self.retry_period_seconds = 0.1
# State
self.upload_state = FileUploadState.NOT_UPLOADED
self.upload_status = None
self.upload_id = None
self.upload_errors = None
self.analysis_id = None
# Check
check_type(api, Api)
check_file(filename)
[docs] def upload(self):
if self.upload_state > FileUploadState.NOT_UPLOADED:
log.warning("The file '{}' has already been uploaded".format(self.filename))
else:
with open(self.filename, 'r') as file_content:
json_data = json.load(file_content) # just to validate the json
try:
upload_response = self.__api.upload(json_data, is_async_validation=self.is_async_validation)
check_song_state(upload_response.status == 'ok' or 'WARNING' in upload_response.status,
'file.upload.fail', "The upload for file '{}' was unsuccessful", self.filename)
self.upload_state = FileUploadState.SUBMITTED
self.upload_id = upload_response.uploadId
except SongClientException as se:
self.upload_errors = "[SONG_CLIENT_EXCEPTION] {} @ {} : {}".format(se.error_id,
se.timestamp, se.message)
self.upload_state = FileUploadState.UPLOAD_ERROR
except SongError as ex:
self.upload_errors = ex
self.upload_state = FileUploadState.UPLOAD_ERROR
except Exception as e:
self.upload_errors = "[{}] : {}".format(e.__class__.__name__, e)
self.upload_state = FileUploadState.UNKNOWN_ERROR
[docs] def update_status(self):
if self.upload_state == FileUploadState.VALIDATION_ERROR:
log.error("Validation error for file '{}' with upload_id '{}': {}".format(self.filename,
self.upload_id,
self.upload_errors))
elif self.upload_state == FileUploadState.NOT_UPLOADED:
log.warning("Status undefined for file '{}' as it was not uploaded".format(self.filename))
elif self.upload_state == FileUploadState.SUBMITTED:
# actually calculate the status
try:
status_response = self.__api.status(self.upload_id)
while status_response.state == 'CREATED' or status_response.state == 'UPDATED':
status_response = self.__api.status(self.upload_id)
time.sleep(self.retry_period_seconds)
if status_response.state == 'VALIDATED':
self.upload_state = FileUploadState.VALIDATED
elif status_response.state == 'SAVED':
self.upload_state = FileUploadState.SAVED
elif status_response.state == 'PUBLISHED':
self.upload_state = FileUploadState.PUBLISHED
elif status_response.state == 'VALIDATION_ERROR':
self.upload_state = FileUploadState.VALIDATION_ERROR
self.upload_errors = status_response.errors
else:
self.upload_state = FileUploadState.UNKNOWN_ERROR
except SongClientException as se:
self.upload_errors = "[SONG_CLIENT_EXCEPTION] {} @ {} : {}".format(se.error_id,
se.timestamp, se.message)
self.upload_state = FileUploadState.STATUS_ERROR
except SongError as ex:
self.upload_errors = ex
self.upload_state = FileUploadState.STATUS_ERROR
except Exception as e:
self.upload_errors = "[{}] : {}".format(e.__class__.__name__, e)
self.upload_state = FileUploadState.UNKNOWN_ERROR
elif FileUploadState.VALIDATED < self.upload_state < FileUploadState.PUBLISHED:
log.info(
"The file '{}' with upload_id '{}' has already been validated and has state '{}'".format(
self.filename, self.upload_id, self.upload_state.__class__.__name__))
[docs] def save(self):
check_state(self.upload_state >= FileUploadState.VALIDATED,
"Need to VALIDATE upload_id '{}' for file '{}' before SAVING",
self.upload_id, self.filename)
if self.upload_state >= FileUploadState.SAVED:
log.warning("The file '{}' with upload_id '{}' was already saved with analysis_id '{}'".format(
self.filename, self.upload_id, self.analysis_id))
else:
try:
save_response = self.__api.save(self.upload_id,
ignore_analysis_id_collisions=self.ignore_analysis_id_collisions)
check_state(save_response.status == 'ok',
"The save for upload_id '{}' for file '{}' was unsuccessfull: {}",
self.upload_id, self.filename, save_response.__dict__)
self.upload_state = FileUploadState.SAVED
self.analysis_id = save_response.analysisId
except SongClientException as se:
self.upload_errors = "[SONG_CLIENT_EXCEPTION] {} @ {} : {}".format(se.error_id,
se.timestamp, se.message)
self.upload_state = FileUploadState.SAVE_ERROR
except SongError as ex:
self.upload_errors = ex
self.upload_state = FileUploadState.SAVE_ERROR
except Exception as e:
self.upload_errors = "[{}] : {}".format(e.__class__.__name__, e)
self.upload_state = FileUploadState.UNKNOWN_ERROR
[docs] def publish(self):
check_state(self.upload_state >= FileUploadState.SAVED,
"Need to SAVE upload_id '{}' for file '{}' before PUBLISHING",
self.upload_id, self.filename)
if self.upload_state >= FileUploadState.PUBLISHED:
log.warning("The file '{}' with upload_id '{}' was already published with analysis_id '{}'".format(
self.filename, self.upload_id, self.analysis_id))
else:
try:
publish_response = self.__api.publish(self.analysis_id)
check_state(publish_response.status == 'ok',
"The publish for analysis_id '{}' for file '{}' and upload_id '{}' was unsuccessfull: {}",
self.analysis_id, self.filename, self.upload_id, publish_response.__dict__)
self.upload_state = FileUploadState.PUBLISHED
except SongClientException as se:
self.upload_errors = "[SONG_CLIENT_EXCEPTION] {} @ {} : {}".format(se.error_id,
se.timestamp, se.message)
self.upload_state = FileUploadState.PUBLISH_ERROR
except SongError as ex:
self.upload_errors = ex
self.upload_state = FileUploadState.PUBLISH_ERROR
except Exception as e:
self.upload_errors = "[{}] : {}".format(e.__class__.__name__, e)
self.upload_state = FileUploadState.UNKNOWN_ERROR
[docs]@dataclass
class SimplePayloadBuilder(object):
donor: Type[Donor]
specimen: Type[Specimen]
sample: Type[Sample]
files: List[File]
experiment: object
analysisId: str = None
def __post_init__(self):
self._analysisType = None
check_state(self.donor is not None, "donor must be defined")
check_state(self.specimen is not None, "specimen must be defined")
check_state(self.sample is not None, "sample must be defined")
check_state(self.experiment is not None, "experiment must be defined")
check_type(self.donor, Donor)
check_type(self.specimen, Specimen)
check_type(self.sample, Sample)
check_type(self.files, list)
check_state(len(self.files) > 0, "Must have atleast one file for the upload payload")
for f in self.files:
check_type(f, File)
if isinstance(self.experiment, SequencingRead):
self._is_seq_read = True
self._analysisType = "sequencingRead"
elif isinstance(self.experiment, VariantCall):
self._is_seq_read = False
self._analysisType = "variantCall"
[docs] def to_dict(self):
composite_entity = CompositeEntity.create(self.donor, self.specimen, self.sample)
if self._is_seq_read:
analysis = SequencingReadAnalysis()
check_type(self.experiment, SequencingRead)
else:
analysis = VariantCallAnalysis()
check_type(self.experiment, VariantCall)
analysis.experiment = self.experiment
analysis.sample.append(composite_entity)
analysis.analysisType = self._analysisType
analysis.file.extend(self.files)
if self.analysisId is not None and self.analysisId:
analysis.analysisId = self.analysisId
out_dict = analysis.to_dict()
out_dict.pop('study')
if self.analysisId is None:
out_dict.pop('analysisId')
return out_dict