Source code for shelljob.fs

"""
	A collection of filesystem related commands.
"""
import os, re, tempfile

[docs]def find( path, include_dirs = True, include_files = True, name_regex = None, not_name_regex = None, whole_name_regex = None, not_whole_name_regex = None, exclude_root = False, relative = False, limit_depth = None ): """ Creates an iterator of files matching a variety of conditions. @param path: which path to iterate @param include_dirs: include directories in output @param include_files: include files in output @param name_regex: optional regex string compared against basename of file @param not_name_regex: if specificed only produces names not matching this regex @param whole_name_regex: like name_regex but applies to whole path, not just basename @param not_whole_name_regex: like not_name_regex but applies to whole path @param exclude_root: do not include the intput 'path' itself in the output @param limit_depth: do not list items deeper than this level from root @param relative: filenames are relative to "path" as opposed to appended to path @return: a generator for the matched files """ def maybe_regex(arg): return re.compile(arg) if arg != None else None c_name_regex = maybe_regex(name_regex) c_not_name_regex = maybe_regex(not_name_regex) c_whole_name_regex = maybe_regex(whole_name_regex) c_not_whole_name_regex = maybe_regex(not_whole_name_regex) def check_name(name, whole_name): if c_name_regex != None and not c_name_regex.match( name ): return False if c_not_name_regex != None and c_not_name_regex.match( name ): return False if c_whole_name_regex != None and not c_whole_name_regex.match( whole_name ): return False if c_not_whole_name_regex != None and c_not_whole_name_regex.match( whole_name ): return False return True def result( whole, rel ): if relative: return rel else: return whole def filter_func(): # A list of paths still to be processed (depth, whole_path, relative_path) queue = [ ( 0, path, '' ) ] while len(queue) != 0: depth, root, rel_path = queue[0] queue = queue[1:] if root == path and exclude_root: pass elif include_dirs and check_name( os.path.basename(root), root ): yield result( root, rel_path ) if limit_depth != None and depth > limit_depth: continue for item in os.listdir(root): whole = os.path.join( root, item ) rel = os.path.join( rel_path, item ) if os.path.isdir(whole): queue.append( ( depth + 1, whole, rel ) ) elif include_files and check_name( item, whole ): yield result( whole, rel ) return filter_func()
[docs]class NamedTempFile: """ Creates a temporary file for a 'with' block. The file is deleted when the block exits. This creates the file to ensure it exists/block a race, but does not write anything to it, nor does it keep it open. It is intended for times when you need a named file for subprocesses. Example:: with fs.NamedTempFile() as nm: proc.call( "curl http://mortoray.com/ -o {}".format( nm ) ) html = open(nm).read() print( len(html) ) """ def __init__(self, suffix = None, prefix = None, dir = None ): """ @param suffix: optional suffix for generated filename (a dot '.' is not automatically added, specifiy it if desired) @param prefix: optional prefix for generated filename @param dir: in which directory, if None then use a system default """ self.args = { 'text': False, } if suffix != None: self.args['suffix'] = suffix if prefix != None: self.args['prefix'] = prefix if dir != None: self.args['dir'] = dir def __enter__(self): (handle, name) = tempfile.mkstemp( **self.args ) os.close(handle) self.name = name return name def __exit__(self, type, value, traceback): os.remove(self.name)