Commit 2297d029 authored by katharina.berger's avatar katharina.berger

Initial commit

parents
# !/usr/bin/env python
import getopt
import sys
import os
from pyesgf.search import SearchConnection
from checks import pid_check_dataset, check_citation, pid_check_children
from config import Config, DBCon, Log, Dataset
usage = """
Usage:
check_esgf_publication
Options:
-c, --config config_file:
Configuration file
Use the default '/esg/config/pub_check.conf', if not specified.
--check-pid-content:
Also check the facets in the PID record against the SearchAPI
--data-node data_node
Check only data of the specified data_node
--disable-citation-check:
Do not check the citation URLs
--disable-pid-check:
Do not check the PID files
--institute institute
Check only data of the specified institute
-l, --log logfile:
Location of the logfile
If not set, use the default location '/tmp'.
--local-search
Search on local index node only
--model model
Check only data of the specified model
--project project:
Check only data of the specified project
--replica
Check replicated data only. The default is to check originals.
--version version
Check only data of the specified version
"""
def main(argv):
try:
args, lastargs = getopt.getopt(argv, "c:l:", ['config=', 'project=', 'model=', 'institute=', 'data-node=',
'version=', 'local-search', 'replica', 'log=',
'disable-citation-check', 'disable-pid-check',
'check-pid-content'])
except getopt.error:
print sys.exc_value
print usage
sys.exit(0)
search_params = {}
search_params['replica'] = False
distributed_search = True
skip_citation = False
skip_pid = False
check_pid_content = False
config_file = '/esg/config/pub_check.conf'
#config_file = os.path.join(os.path.dirname(__file__).split('EGG-INFO')[0], 'conf_files/pub_check.cfg')
logfile_path = '/tmp'
for flag, arg in args:
if flag in ['-c', '--config']:
config_file = arg
elif flag == '--project':
search_params['project'] = arg
elif flag == '--model':
search_params['model'] = arg
elif flag == '--institute':
search_params['institution_id'] = arg
elif flag == '--data-node':
search_params['data_node'] = arg
elif flag == '--version':
search_params['version'] = 'v%s' % arg
elif flag == '--local-search':
distributed_search = False
elif flag == '--replica':
search_params['replica'] = True
elif flag in ['-l', '--log']:
logfile_path = arg
elif flag == '--disable-citation-check':
skip_citation = True
elif flag == '--disable-pid-check':
skip_pid = True
elif flag == '--check-pid-content':
check_pid_content = True
if not os.path.isfile(config_file):
print 'Not a valid file: %s' % config_file
sys.exit(0)
# get config, setup database connection and init logging
config = Config(config_file)
db_con = DBCon(config.db)
if 'project' in search_params:
project = search_params['project']
else:
project = 'none'
logfile_name = '%s/%s_check_%s.log' % (logfile_path, project, datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d'))
log = Log(logfile_path, config.loglevel)
print('Logging information will be written to %s' % logfile_path)
# check all published data, use specified search params
conn = SearchConnection('http://%s/esg-search' % config.index_node, distrib=distributed_search)
ctx = conn.new_context(**search_params)
if ctx.hit_count == 0:
print('No data found for search params: %s' % search_params)
for dataset in ctx.search():
ds = Dataset(dataset.json['master_id'], dataset.json['version'], dataset.json['data_node'], search_params['replica'])
try:
ds.set_pid(dataset.json['pid'][0])
except:
log.warning('PID not present: %s' % ds.master_id)
continue
# don't need to check datasets twice
if search_params['replica'] == True and db_con.replica_exists(ds.pid, ds.data_node):
continue
elif db_con.dataset_exists(ds.pid):
continue
# check PIDs
if skip_pid:
pid_valid = True
else:
pid_valid = pid_check_dataset(ds, config, log, check_pid_content, search_params)
# check citation
if skip_citation:
citation_valid = True
else:
try:
ds.set_citation(dataset.json['citation_url'][0])
citation_valid = check_citation(ds.citation, log)
except:
citation_valid = False
log.warning('Citation not present: %s' % ds.master_id)
if pid_valid and citation_valid:
if search_params['replica'] == True:
log.info('OK: Replica %s.v%s on node %s' % (ds.master_id, ds.version, ds.data_node))
db_con.add_replica(ds.master_id, ds.version, ds.pid, ds.data_node, ds.citation)
else:
log.info('OK: Original %s.v%s on node %s' % (ds.master_id, ds.version, ds.data_node))
db_con.add_dataset(ds.master_id, ds.version, ds.pid, ds.citation)
db_con.close()
log.close()
if __name__ == '__main__':
main(sys.argv[1:])
from pid_check import check_pid, pid_check_children, pid_check_dataset
from citation_check import check_citation
import json
import requests
def check_citation(citation_url, log):
response = requests.get(citation_url)
if not response.status_code == 200:
log.warning('Citation not found: %s' % citation_url)
return False
else:
parts = response.content.split(',')
for p in parts:
if 'creators' in p:
creators = p.split(':')[-1].strip()
if len(creators) < 5:
return False
break
return True
import json
import requests
from config import File
from utils import FilePID, DatasetPID
from pyesgf.search import SearchConnection
def check_pid(pid, id, config, log, check_pid_content):
handle = pid[4:]
response = requests.get(config.handle_url + config.handle_api + handle)
if not response.status_code == 200:
log.warning('Cannot resolve PID: %s (%s)' % (id, pid))
return False, None
else:
if check_pid_content:
return True, response
return True, None
def pid_check_dataset(dataset, config, log, check_pid_content, search_params):
dataset_id_version = '%s.v%s' % (dataset.master_id, dataset.version)
dataset_pid_valid, response = check_pid(dataset.pid, dataset_id_version, config, log, check_pid_content)
# check children only in case dataset PID is present
if dataset_pid_valid:
search_params['pid'] = dataset.pid
children_pids_valid = pid_check_children(dataset, search_params, config, log, check_pid_content)
if children_pids_valid:
log.info('Dataset and File PIDs found for %s.' % dataset_id_version)
if check_pid_content:
dataset_pid_valid = check_dataset_pid_metadata(response, dataset, log)
return dataset_pid_valid and children_pids_valid
return False
def pid_check_children(dataset, search_params, config, log, check_pid_content):
children_pids_valid = True
conn = SearchConnection('http://%s/esg-search' % config.index_node)
ctx = conn.new_context(**search_params)
files = ctx.search()[0].file_context().search()
for f in files:
fs = File(f.checksum, f.filename, f.size, f.tracking_id, f.download_url)
dataset.add_file(fs)
current_child_valid, response = check_pid(f.tracking_id, f.filename, config, log, check_pid_content)
children_pids_valid = children_pids_valid and current_child_valid
if current_child_valid and check_pid_content:
success = check_file_pid_metadata(response, fs, dataset.pid, dataset.replica, log)
children_pids_valid = children_pids_valid and success
return children_pids_valid
def check_dataset_pid_metadata(response, dataset, log):
success = True
dataset_pid = DatasetPID(response)
if dataset_pid.metadata_complete:
if dataset_pid.aggregation_level != 'DATASET':
log.warning('Dataset - Aggregation_level mismatch: %s' % dataset.pid)
success = False
if dataset_pid.master_id != dataset.master_id:
log.warning('Dataset - drs_id mismatch: %s' % dataset.pid)
success = False
if dataset_pid.version != dataset.version:
log.warning('Dataset - version mismatch: %s' % dataset.pid)
success = False
if not dataset.replica:
if dataset.data_node not in dataset_pid.data_nodes:
log.warning('Dataset - data_node mismatch: %s' % dataset.master_id)
success = False
else:
if dataset.data_node not in dataset_pid.data_nodes_replica:
log.warning('Dataset - data_node mismatch: %s' % dataset.master_id)
success = False
file_pids = set(f.pid for f in dataset.files)
if set(dataset_pid.file_pids) != set(file_pids):
log.warning('Dataset - Children mismatch: %s' % dataset.master_id)
success = False
else:
log.warning('Dataset - Missing metadata: %s' % dataset.master_id)
success = False
return success
def check_file_pid_metadata(response, file_searchapi, dataset_pid, replica, log):
success = True
file_pid = FilePID(response)
if file_pid.metadata_complete:
if file_pid.aggregation_level != 'FILE':
log.warning('File - Aggregation_level mismatch: %s' % file_searchapi.pid)
success = False
if file_pid.size != str(file_searchapi.size):
log.warning('File - Size mismatch: %s' % file_searchapi.pid)
success = False
if file_pid.checksum != str(file_searchapi.checksum):
log.warning('File - Checksum mismatch: %s' % file_searchapi.pid)
success = False
if file_pid.filename != str(file_searchapi.filename):
log.warning('File - Filename mismatch: %s' % file_searchapi.pid)
success = False
if not replica:
if str(file_searchapi.download_url) not in file_pid.download_url:
print file_searchapi.download_url, file_pid.download_url
log.warning('File - URL mismatch: %s' % file_searchapi.pid)
success = False
else:
if str(file_searchapi.download_url) not in file_pid.download_urls_replica:
print file_searchapi.download_url, file_pid.download_urls_replica
log.warning('File - URL mismatch: %s' % file_searchapi.pid)
success = False
if dataset_pid not in file_pid.dataset_pids:
log.warning('File - Dataset mismatch: %s' % file_searchapi.pid)
success = False
else:
log.warning('File - Missing metadata: %s' % file_searchapi.filename)
success = False
return success
[DEFAULT]
INDEX_NODE = esgf-data.dkrz.de
HANDLE_URL = https://hdl.handle.net
HANDLE_API = /api/handles/
DB_LOCATION = /tmp/pid_citation_check.db
LOGLEVEL = INFO
from config import Config, DBCon, Log
from structs import File, Dataset
import sys
import sqlite3
from datetime import datetime
from ConfigParser import ConfigParser
class Config:
def __init__(self, config_file):
config = ConfigParser()
config.read(config_file)
self.config = config
self.index_node = self.get_config('INDEX_NODE')
self.handle_url = self.get_config('HANDLE_URL')
self.handle_api = self.get_config('HANDLE_API')
self.db = self.get_config('DB_LOCATION')
self.loglevel = self.get_config('LOGLEVEL')
def get_config(self, param):
try:
return self.config.get('DEFAULT', param)
except:
print 'Missing config option: %s' %param
sys.exit(0)
class DBCon:
def __init__(self, db_path):
self.db_path = db_path
self.conn = sqlite3.connect(self.db_path)
self.c = self.conn.cursor()
# create table for datasets and files, if not exists
self.c.execute('''
CREATE TABLE IF NOT EXISTS datasets(
id INTEGER PRIMARY KEY,
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
pid VARCHAR(256) NOT NULL,
citation VARCHAR(256))
''')
# create table for datasets and files, if not exists
self.c.execute('''
CREATE TABLE IF NOT EXISTS replica_datasets(
id INTEGER PRIMARY KEY,
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
pid VARCHAR(256) NOT NULL,
data_node VARCHAR(256) NOT NULL,
citation VARCHAR(256))
''')
self.commit()
def commit(self):
self.conn.commit()
def add_dataset(self, name, version, pid, citation):
self.c.execute('INSERT INTO datasets(name, version, pid, citation) VALUES (?,?,?,?)', [name, version, pid, citation])
self.commit()
def add_replica(self, name, version, pid, data_node, citation):
self.c.execute('INSERT INTO replica_datasets(name, version, pid, data_node, citation) VALUES (?,?,?,?,?)',
[name, version, pid, data_node, citation])
self.commit()
def dataset_exists(self, pid):
self.c.execute('SELECT id FROM datasets WHERE pid=?', [pid])
if self.c.fetchone():
return True
else:
return False
def replica_exists(self, pid, data_node):
self.c.execute('SELECT id FROM replica_datasets WHERE pid=? and data_node=?', [pid, data_node])
if self.c.fetchone():
return True
else:
return False
def close(self):
self.conn.close()
class Log:
def __init__(self, logfile_path, level):
self.logfile_path = logfile_path
self.logfile = self.init_logging()
self.level = level.upper()
def init_logging(self):
return open(self.logfile_path, 'a')
def close(self):
self.logfile.close()
def warning(self, message):
datetime_now = str(datetime.now()).split('.')[0]
msg = '%s - WARNING - %s\n' % (datetime_now, message)
self.logfile.write(msg)
# print(msg)
def debug(self, message):
if self.level in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
datetime_now = str(datetime.now()).split('.')[0]
msg = '%s - DEBUG - %s\n' % (datetime_now, message)
self.logfile.write(msg)
# print(msg)
def info(self, message):
if self.level in ['INFO', 'WARNING', 'ERROR', 'CRITICAL']:
datetime_now = str(datetime.now()).split('.')[0]
msg = '%s - INFO - %s\n' % (datetime_now, message)
self.logfile.write(msg)
# print(msg)
class File:
def __init__(self, checksum, filename, size, pid, download_url):
self.checksum = checksum
self.filename = filename
self.size = size
self.pid = pid
self.download_url = download_url
class Dataset:
def __init__(self, master_id, version, data_node, replica):
self.master_id = master_id
self.version = version
self.data_node = data_node
self.replica = replica
self.pid = None
self.citation = None
self.files = set()
def set_pid(self, pid):
self.pid = pid
def set_citation(self, citation):
self.citation = citation
def add_file(self, f):
self.files.add(f)
# !/usr/bin/env python
import os
import shutil
from setuptools import setup, find_packages
VERSION = 0.0
conf_final_location = '/esg/config/pid_citation_check.conf'
if not os.path.exists(conf_final_location):
shutil.copyfile('./config.cfg', conf_final_location)
setup(
name='pid_citation_check',
version=VERSION,
description='Check PID and Citation of already published data.',
author='Katharina Berger',
author_email='berger@dkrz.de',
install_requires=[
"esgf-pyclient>=0.1.8",
],
packages=find_packages(),
data_files=[
('conf_files', ['config.cfg'])
],
include_package_data=True,
scripts=[
'check_publication',
],
zip_safe=False, # Migration repository must be a directory
)
from pid_utils import DatasetPID, FilePID
import json
def _lookup_metadata(pid_response, lookup_field):
try:
pid_values = json.loads(pid_response.content)['values']
return (item['data']['value'] for item in pid_values if item['type'] == lookup_field).next()
except:
return None
class FilePID:
def __init__(self, pid_response):
self.pid_response = pid_response
self.aggregation_level = str(_lookup_metadata(self.pid_response, 'AGGREGATION_LEVEL'))
self.size = str(_lookup_metadata(self.pid_response, 'FILE_SIZE'))
self.checksum = str(_lookup_metadata(self.pid_response, 'CHECKSUM'))
self.filename = str(_lookup_metadata(self.pid_response, 'FILE_NAME'))
try:
self.download_url = str(_lookup_metadata(self.pid_response, 'URL_ORIGINAL_DATA')).split('href="')[1].split('" /></locations>')[0].split('"')[0]
except:
self.download_url = None
try:
replica_urls = str(_lookup_metadata(self.pid_response, 'URL_REPLICA')).split('" /></locations>')[0].split('href="')
self.download_urls_replica = set()
for url in replica_urls:
self.download_urls_replica.add(url.split('"')[0])
except:
self.download_urls_replica = None
self.dataset_pids = str(_lookup_metadata(self.pid_response, 'IS_PART_OF')).split(';')
# False iff any is None or empty
dn_present = any([self.download_url, self.download_urls_replica])
self.metadata_complete = all([self.aggregation_level, self.size, self.checksum, self.filename, dn_present, self.dataset_pids])
class DatasetPID:
def __init__(self, pid_response):
self.pid_response = pid_response
self.aggregation_level = str(_lookup_metadata(self.pid_response, 'AGGREGATION_LEVEL'))
self.master_id = str(_lookup_metadata(self.pid_response, 'DRS_ID'))
self.version = str(_lookup_metadata(self.pid_response, 'VERSION_NUMBER'))
try:
self.data_nodes = str(_lookup_metadata(self.pid_response, 'HOSTING_NODE')).split('" /></locations>')[0].split('host="')[1].split('"')
except:
self.data_nodes = None
try:
self.data_nodes_replica = str(_lookup_metadata(self.pid_response, 'REPLICA_NODE')).split('" /></locations>')[0].split('host="')[1].split('"')
except:
self.data_nodes_replica = None
self.file_pids = str(_lookup_metadata(self.pid_response, 'HAS_PARTS')).split(';')
# False iff any is None or empty
dn_present = any([self.data_nodes, self.data_nodes_replica])
self.metadata_complete = all([self.aggregation_level, self.master_id, self.version, dn_present, self.file_pids])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment