Commit da3079e0 authored by pavan kumar.siligam's avatar pavan kumar.siligam

fixed issue inactive session timeout

parent 853cf28b
#!/usr/bin/env python
"""
python interface to HPSS pftp
"""
import argparse
import os
import sys
import getpass
import netrc
import ftplib
import posixpath
import re
import datetime
import json
from collections import OrderedDict
from netCDF4 import Dataset
from subprocess import Popen, PIPE
from contextlib import contextmanager
HOST = 'tape.dkrz.de'
PORT = 4021
@contextmanager
def ignore_anyerror():
"try any thing while ignoring the exception"
try:
yield
except:
pass
@contextmanager
def timeit():
start_time = datetime.datetime.now()
yield
end_time = datetime.datetime.now()
print("Elapsed: {!s}".format(end_time - start_time))
@contextmanager
def temporary_change_dir(func, oldpath, newpath):
func(newpath)
yield
func(oldpath)
def _create_netrc(username=None, password=None, skip_header=True):
username = username or getpass.getuser()
userhome = os.environ.get('HOME')
password = password or getpass.unix_getpass()
header = "# machine <mname> username <uname> password <password>"
entry_fmt = "machine {0} username {1} password {2}"
entry = entry_fmt.format(HOST, username, password)
if skip_header:
entries = "\n".join((entry, ''))
else:
entries = "\n".join((header, entry, ''))
fname = os.path.join(userhome, '.netrc')
with open(fname, 'w') as fid:
fid.writelines(entries)
os.chmod(fname, 0600)
return
def connect(username=None, password=None):
ftp_obj = ftplib.FTP()
ftp_obj.connect(HOST, PORT)
username = username or getpass.getuser()
if password is None:
try:
n = netrc.netrc()
except IOError:
_create_netrc(username, skip_header=False)
n = netrc.netrc()
credentials = n.hosts.get(HOST)
if credentials is None:
_create_netrc(username, password)
n = netrc.netrc()
credentials = n.hosts.get(HOST)
username, account, password = credentials
ftp_obj.login(username, password, account)
return ftp_obj
_msg_re = re.compile('^[0-9]+-?\s?[A-Za-z].*').match
def filter_msg(lines):
try:
lines = lines.splitlines()
except AttributeError:
pass
for line in lines:
if line and not _msg_re(line):
yield line
def msgs(lines):
try:
lines = lines.splitlines()
except AttributeError:
pass
for line in lines:
if _msg_re(line):
yield line
INDENT = 4
def meta_collect(fname, hpss_options=None, ospath=os.path, datetime=datetime):
Basename = ospath.basename
# Dirname = ospath.dirname
Abspath = ospath.abspath
nc = Dataset(fname)
mdata = OrderedDict()
mdata['filename'] = Basename(fname)
mdata['mistral_path'] = Abspath(fname)
if hpss_options is not None:
if isinstance(hpss_options, dict):
mdata.update(hpss_options)
_stats = os.stat(fname)
mdata['size'] = _stats.st_size
mdata['format'] = nc.data_model
# mdata['access'] = datetime.datetime.strftime(
# datetime.datetime.fromtimestamp(_stats.st_atime), "%d-%m-%YT%T")
mdata['modify'] = datetime.datetime.strftime(
datetime.datetime.fromtimestamp(_stats.st_mtime), "%d-%m-%YT%T")
# mdata['change'] = datetime.datetime.strftime(
# datetime.datetime.fromtimestamp(_stats.st_ctime), "%d-%m-%YT%T")
_metadata = OrderedDict()
# DIMENSIONS
_dimensions = OrderedDict()
for entry in nc.dimensions.values():
_dimensions[entry.name] = len(entry)
# VARIABLES
_variables = []
for vname,variable in nc.variables.items():
_varmeta = OrderedDict()
_varmeta['name'] = vname
_varmeta['dtype'] = str(variable.dtype)
_varmeta['dimensions'] = "({})".format(", ".join(variable.dimensions))
for ncattr in variable.ncattrs():
_varmeta[ncattr] = str(getattr(variable, ncattr))
_variables.append(_varmeta)
# global attrs
_global_attrs = OrderedDict()
for ncattr in nc.ncattrs():
_global_attrs[ncattr] = str(getattr(nc, ncattr))
_metadata['dimensions'] = _dimensions
_metadata['variables'] = _variables
_metadata['global attributes'] = _global_attrs
mdata['metadata'] = _metadata
nc.close()
return mdata
def meta_load(filename):
with open(filename) as fid:
data = json.load(fid, object_pairs_hook=OrderedDict)
return data
def meta_save(filename, collection):
with open(filename, "w") as fid:
json.dump(collection, fid, indent=INDENT)
def meta_append(collection, entry):
target = entry['filename']
# find existing entries
for index,record in enumerate(collection):
if record['filename'] == target:
break
else:
index = None
if index is not None:
# we have found a entry
print('found existing metadata entry for %s' % target)
old_entry = collection.pop(index)
collection.insert(index, entry)
else:
old_entry = None
collection.append(entry)
return old_entry
class pftp(object):
_HOST = HOST
_PORT = PORT
ROOT = "/hpss/arch"
PROJ = "bm0834"
# blocksize = 16777216
blocksize = 262144
os_path_join = os.path.join
def __init__(self, username=None, password=None):
self.username = username = username or getpass.getuser()
self.ftp_obj = connect(username, password)
# self.cwd(self.TOPDIR)
# try:
# self.cwd(self.HOME)
# except ftplib.error_temp:
# self.cwd(self.TOPDIR)
# except:
# print("working directory: {}".format(self.pwd()))
# pass
@property
def HOME(self):
return self.os_path_join(self.TOPDIR, self.username)
@property
def TOPDIR(self):
return self.os_path_join(self.ROOT, self.PROJ)
def __repr__(self):
s = ("<{0.__class__.__name__} connected to "
"{0._HOST}:{0._PORT} "
"as {0.username}>")
return repr(s.format(self))
def quit(self):
self.ftp_obj.quit()
def close(self):
self.quit()
self.ftp_obj.close()
# ----- infomation related ------
def pwd(self):
return self.ftp_obj.pwd()
def cwd(self, path):
self.ftp_obj.cwd(path)
print("working directory: {}".format(self.pwd()))
return self.pwd()
# def exists(self, pathname):
# "Returns true if pathname exists"
# pwd = self.pwd()
# try:
# self.cwd(pathname)
# except ftplib.error_perm as e:
# if 'No such file or directory' in e.message:
# exists = False
# elif 'Permission denied' in e.message:
# exists = True
# elif "550 Could not get list attributes" in e.message:
# exists = False
# else:
# raise e
# finally:
# self.cwd(pwd)
# return exists
def exists(self, path):
try:
result = self.stat(path)
except:
result = None
return True if result else False
def isfile(self, pathname):
"Returns true if pathname refers to an existing file"
_stat = self.stat(pathname)
if _stat is None:
raise ValueError("path does not exists %s" % pathname)
line = _stat.pop(0)
if line[0] == '-':
return True
if line[0] == 'l':
pwd = self.pwd()
name = line.split()[-1]
_flag = False
try:
self.cwd(name)
except:
_flag = True
finally:
self.cwd(pwd)
return _flag
return False
def isdir(self, pathname):
"Returns true if pathname refers to an existing directory"
_stat = self.stat(pathname)
if _stat is None:
raise ValueError("path does not exists %s" % pathname)
line = _stat.pop(0)
if line[0] == 'd':
return True
if line[0] == 'l':
pwd = self.pwd()
name = line.split()[-1]
try:
self.cwd(name)
_flag = True
except ftplib.error_perm:
_flag = False
finally:
self.cwd(pwd)
return _flag
return False
def islink(self, pathname):
_stat = self.stat(pathname)
if _stat is None:
raise ValueError("path does not exists %s" % pathname)
line = _stat.pop(0)
if line[0] == 'l':
return True
return False
def stat(self, pathname):
"Returns stat of the path"
try:
_stat = self.ftp_obj.sendcmd('STAT {}'.format(pathname))
except ftplib.error_perm:
return None
return list(filter_msg(_stat))
def size(self, pathname):
"Returns size of path in bytes"
return int(self.stat(pathname)[0].split()[6])
# def dir(self, path='.'):
# result = []
# self.ftp_obj.dir(path, result.append)
# return result
# def _files(self, path='.'):
# result = []
# listing = self.dir(path)
# for line in listing:
# if line.startswith('-'):
# result.append(line.split()[-1])
# continue
# if line.startswith('l'):
# fname = line.splitlines()[-1]
# if self.isfile(fname):
# result.append(fname)
# return result
def listing(self, path=None):
path = path or self.pwd()
return self.ftp_obj.nlst(path)
def files(self, path=None):
if path is None:
path = self.pwd()
result = []
_listing = self.listing(path)
for entry in _listing:
if self.isfile(entry):
result.append(entry)
return result
def directories(self, path=None):
if path is None:
path = self.pwd()
result = []
_listing = self.listing(path)
for entry in _listing:
if self.isdir(entry):
result.append(entry)
return result
def walk(self, directory):
"""Traverse the directory yielding (topdir, subdirs, files) tuple
as traversing through it. Similar to os.walk"""
_files = self.files(directory)
_subdirs = self.directories(directory)
yield (directory, _subdirs, _files)
for subdir in _subdirs:
for x in self.walk(subdir):
yield x
# ========== MANIPULATIONS ==========
def mkdir(self, pathname):
"Make directory"
self.ftp_obj.mkd(pathname)
print("CREATED DIRECTORY: %s" % pathname)
return
def makedirs(self, pathname):
"""Recursively create directories as required walking up
to an existing parent directory"""
try:
_isdir = self.isdir(pathname)
except ValueError:
pass
else:
if _isdir:
return
dirname = posixpath.dirname(pathname)
if dirname:
self.makedirs(dirname)
self.mkdir(pathname)
return
def rmdir(self, directory):
"Remove directory"
self.ftp_obj.rmd(directory)
print("REMOVED DIRECTORY: %s" % directory)
return None
def removedirs(self, path):
"Recursively remove directory tree"
remove = self.remove
rmdir = self.rmdir
path = posixpath.normpath(path)
dirs = set()
for topdir, subdirs, files in self.walk(path):
for _file in files:
remove(_file)
if subdirs:
for sub in subdirs:
dirs.add(sub)
dirs.add(topdir)
dirs = sorted(dirs, key=lambda x: len(x.split(posixpath.sep)), reverse=True)
for d in dirs:
rmdir(d)
return None
def remove(self, filename):
"deletes the file"
self.ftp_obj.delete(filename)
print('DELETED FILE: %s' % filename)
return
# ========== FILE-TRANSFERS ==========
def downloadfile(self, source, destination):
"""Download a file from ftp server to localhost.
source<REMOTE:ftp> ==> destination<LOCAL:localhost>
example:
>>> # existing file on ftp server
>>> source = '/hpss/arch/bm0834/k202101/sample.nc'
>>> # filename with existing dirname on localhost
>>> destination = '/tmp/sample.nc'
>>> f = Ftpclient()
>>> f.downloadfile(source, destination)
"""
sfile = posixpath.basename(source)
if os.path.isdir(destination):
dfile = os.path.join(destination, sfile)
else:
dfile = destination
if os.path.exists(dfile):
raise ValueError('source file exists at destination.')
with open(dfile, 'wb') as filehandle:
self.ftp_obj.retrbinary("RETR {}".format(source), filehandle.write, self.blocksize)
return (source, destination)
def downloadtree(self, source, destination):
"""Recursively copy source-directory-tree from remote ftp server
to destination directory on a localhost.
Contents at source along with source basename is downloaded.
At destination, if a directory with similar name as source (basename
only, not the fullpath) exists, it is updated.
examples:
--------
1. source='/outgoing/user/docs/', destination='/tmp/'
- '/tmp/docs' DOES NOT EXIST at destination. downloadtree will create
it and download the contents in '/outgoing/user/docs/' to this
folder.
- '/tmp/docs' EXISTS at destination. downloadtree will download
the conents, overwritting similar files.
"""
source = source.rstrip('/')
destination = destination.rstrip('/')
s_basename = posixpath.basename(source)
d_basename = posixpath.basename(destination)
if s_basename != d_basename:
destination = self.os_path_join(destination, s_basename)
destination = os.path.normpath(destination)
with ignore_anyerror():
os.makedirs(destination)
source_relative_path = re.compile('^{}/?'.format(source))
_results = []
_result_append = _results.append
downloadfile = self.downloadfile
for current_dir, subdirs, files in self.walk(source):
for _subdir in subdirs:
dest_dir = self.os_path_join(destination,
source_relative_path.sub('', _subdir))
with ignore_anyerror():
os.mkdir(dest_dir)
for _filename in files:
dest_file = self.os_path_join(destination,
source_relative_path.sub('', _filename))
downloadfile(_filename, dest_file)
_result_append((_filename, dest_file))
return _results
def uploadfile(self, source, destination=None):
src_dir = os.path.dirname(source)
src_file = os.path.basename(source)
if destination is None:
destination = self.pwd()
else:
if not os.path.exists(destination):
self.makedirs(destination)
dst_dir = destination
src_pwd = os.getcwd()
# dst_pwd = self.pwd()
with temporary_change_dir(os.chdir, src_pwd, src_dir):
# with temporary_change_dir(self.cwd, dst_pwd, dst_dir):
with timeit():
client = Popen('pftp', stdin=PIPE, stdout=PIPE, stderr=PIPE)
client.communicate("prompt \ncd {}\npput {}\n".format(dst_dir, src_file))
client.wait()
return source, os.path.join(destination, os.path.basename(source))
# def uploadfile(self, source, destination):
# """Upload a file from localhost to ftp server.
# `source`: file path on localhost
# `destination`: file path on ftp server
# source<LOCAL:locahost> ==> destination<REMOTE:ftp>
# """
# presentdir = self.pwd()
# self.cwd(destination)
# # if self.isdir(destination):
# # destination = posixpath.join(destination, os.path.basename(source))
# destination = os.path.basename(source)
# with open(source, 'rb') as filehandle:
# self.ftp_obj.storbinary("STOR {}".format(destination), filehandle, self.blocksize)
# self.cwd(presentdir)
# return (source, destination)
def uploadtree(self, source, destination):
"""Recursively copy source-tree from localhost to destination directory
on remote ftp server.
"""
source = source.rstrip('/')
destination = destination.rstrip('/')
s_basename = posixpath.basename(source)
d_basename = posixpath.basename(destination)
if s_basename != d_basename:
destination = self.os_path_join(destination, s_basename)
destination = os.path.normpath(destination)
with ignore_anyerror():
self.makedirs(destination)
source_relative_path = re.compile('^{}/?'.format(source))
_results = []
_result_append = _results.append
uploadfile = self.uploadfile
for current_dir, subdirs, files in self.walk(source):
for _subdir in subdirs:
dest_dir = self.os_path_join(destination,
source_relative_path.sub('', _subdir))
with ignore_anyerror():
self.mkdir(dest_dir)
for _filename in files:
dest_file = self.os_path_join(destination,
source_relative_path.sub('', _filename))
uploadfile(_filename, dest_file)
_result_append((_filename, dest_file))
return _results
def xupload(source, destination):
src_dir = os.path.dirname(source)
src_file = os.path.basename(source)
print('-'*70)
p = pftp()
if not p.exists(destination):
print("CREATING DIRECTORY ON HPSS: %s" % destination)
p.makedirs(destination)
p.close()
dst_dir = destination
src_pwd = os.getcwd()
print("UPLOADING FILE: %s" % src_file)
src_filesize = os.stat(source).st_size
transfer_rate = 300 * 1000 * 1000 # 300 Mbytes/sec approx
ETA = src_filesize / transfer_rate
m, s = divmod(ETA, 60)
h, m = divmod(m, 60)
print("ETA: %d:%02d:%02d" % (h, m, s))
with temporary_change_dir(os.chdir, src_pwd, src_dir):
with timeit():
client = Popen('pftp', stdin=PIPE, stdout=PIPE, stderr=PIPE)
client.communicate("prompt \ncd {}\npput {}\n".format(dst_dir, src_file))
client.wait()
print("DONE UPLOADING")
print("CHECKING FILESIZE...")
dest_file = os.path.join(destination, os.path.basename(source))
p = pftp()
dest_filesize = p.size(dest_file)
p.close()
if src_filesize == dest_filesize:
print("ALL BYTES TRANSFERED :)")
else:
print("*?\-/* SIZE MISMATCH local({}) HPSS({})".format(src_filesize, dest_filesize))
print('-'*70)
return source, dest_file
def cmdline_interface():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("cmd",
type=str,
choices=('uploadfile', 'uploadtree', 'downloadfile', 'downloadtree'))
parser.add_argument("sourcefile",
type=str,
help='file name')
parser.add_argument("destination",
type=str,
help='file name')
args = parser.parse_args()
p = pftp()
if args.cmd == "uploadfile":
p.uploadfile(args.sourcefile, args.destination)
elif args.cmd == 'uploadtree':
p.uploadtree(args.sourcefile, args.destination)
else:
p.close()
raise NotImplementedError
p.close()
def cli():
"command line interface for pypftp"
parser = argparse.ArgumentParser()
subcommands = parser.add_subparsers(help="SUB-COMMANDS")
upload_parser = subcommands.add_parser(
"upload",
help="upload file(s) to hpss",
description="uploads source file(s) on localhost to destination path at HPSS",
)
upload_parser.add_argument(
"-m", "--metadata",
required=False,
metavar="METADATA",
help="filename to write metadata",
)
upload_parser.add_argument(
"-d", "--dest",
required=True,
metavar="DEST",
help="directory on HPSS where source files must be copied to",
)
upload_parser.add_argument(
"SOURCE",
nargs='+',
help='file(s) to upload',
)
args = parser.parse_args()
Basename = os.path.basename
Abspath = os.path.abspath
Join = os.path.join
jsonfile = args.metadata
hpssdir = args.dest
sources = args.SOURCE
command_type = sys.argv[1]
if command_type == 'upload':
for sfile in sources:
sbase = Basename(sfile)
sabspath = Abspath(sfile)
if jsonfile is not None:
Bigdata = os.path.exists(jsonfile) and meta_load(jsonfile) or []
hpss_options = {}
hpss_options['hpss_path'] = Join(hpssdir, sbase)
hpss_options['hpss_added'] = datetime.datetime.strftime(
datetime.datetime.now(), "%d-%m-%Y")
mdata = meta_collect(sabspath, hpss_options)
# p = pftp()
# p.uploadfile(sfile, hpssdir)
# p.close()
xupload(sfile, hpssdir)
if jsonfile is not None:
mdata['hpss_added'] = datetime.datetime.strftime(
datetime.datetime.now(), "%d-%m-%YT%T")
if Bigdata:
meta_append(Bigdata, mdata)
else:
Bigdata.append(mdata)
meta_save(jsonfile, Bigdata)
if __name__ == "__main__":
# cmdline_interface()
cli()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment