Files
Axter-Stash/plugins/RenameFile/openedFile.py

188 lines
8.6 KiB
Python

"""
openedFile (By David Maisonave aka Axter)
https://github.com/David-Maisonave/Axter-Stash/tree/main/plugins/RenameFile
Description:
Close all (open file) handles on all processes for a given file.
Use Case:
Can be used when a file needs to be deleted or moved,
but the file is locked by one or more other processes.
Requirements:
This class requires Sysinternals handle.exe, which can be downloaded from following link:
https://learn.microsoft.com/en-us/sysinternals/downloads/handle
Important: **MUST call this class in admin mode with elevated privileges!!!**
Example Usage:
handleExe = r"C:\Sysinternals\handle64.exe"
of = openedFile(handleExe)
of.closeFile(r"B:\V\V\testdup\deleme2.mp4")
"""
import ctypes, os, sys, psutil, argparse, traceback, logging, numbers, string
from ctypes import wintypes
# from StashPluginHelper import StashPluginHelper
# Look at the following links to enhance this code:
# https://stackoverflow.com/questions/35106511/how-to-access-the-peb-of-another-process-with-python-ctypes
# https://www.codeproject.com/Articles/19685/Get-Process-Info-with-NtQueryInformationProcess
# Important: MUST call this class in admin mode with elevated privileges!!!
# This class has member function runMeAsAdmin, which will elevate privileges.
# When member function closeFile is called, it will call runMeAsAdmin as needed.
# getPid is the only function which does NOT require elevated admin privileges.
class openedFile():
# generic strings and constants
ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
NTSTATUS = wintypes.LONG
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_READ_ATTRIBUTES = 0x80
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_INFORMATION_CLASS = wintypes.ULONG
FileProcessIdsUsingFileInformation = 47 # see https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ne-wdm-_file_information_class
LPSECURITY_ATTRIBUTES = wintypes.LPVOID
ULONG_PTR = wintypes.WPARAM
lastPath = None
handleExe = None
stash = None
def __init__(self, handleExe, stash = None):
self.handleExe = handleExe
self.stash = stash
if handleExe == None or handleExe == "" or not os.path.isfile(handleExe):
raise Exception(f"handleExe requires a valid path to Sysinternals 'handle.exe' or 'handle64.exe' executable. Can be downloaded from following link:\nhttps://learn.microsoft.com/en-us/sysinternals/downloads/handle")
# create handle on concerned file with dwDesiredAccess == self.FILE_READ_ATTRIBUTES
self.kernel32.CreateFileW.restype = wintypes.HANDLE
self.kernel32.CreateFileW.argtypes = (
wintypes.LPCWSTR, # In lpFileName
wintypes.DWORD, # In dwDesiredAccess
wintypes.DWORD, # In dwShareMode
self.LPSECURITY_ATTRIBUTES, # In_opt lpSecurityAttributes
wintypes.DWORD, # In dwCreationDisposition
wintypes.DWORD, # In dwFlagsAndAttributes
wintypes.HANDLE) # In_opt hTemplateFile
def getPid(self, path):
self.lastPath = path
hFile = self.kernel32.CreateFileW(
path, self.FILE_READ_ATTRIBUTES, self.FILE_SHARE_READ, None, self.OPEN_EXISTING,
self.FILE_FLAG_BACKUP_SEMANTICS, None)
if hFile == self.INVALID_HANDLE_VALUE:
raise ctypes.WinError(ctypes.get_last_error())
# prepare data types for system call
class IO_STATUS_BLOCK(ctypes.Structure):
class _STATUS(ctypes.Union):
_fields_ = (('Status', self.NTSTATUS),
('Pointer', wintypes.LPVOID))
_anonymous_ = '_Status',
_fields_ = (('_Status', _STATUS),
('Information', self.ULONG_PTR))
iosb = IO_STATUS_BLOCK()
class FILE_PROCESS_IDS_USING_FILE_INFORMATION(ctypes.Structure):
_fields_ = (('NumberOfProcessIdsInList', wintypes.LARGE_INTEGER),
('ProcessIdList', wintypes.LARGE_INTEGER * 64))
info = FILE_PROCESS_IDS_USING_FILE_INFORMATION()
PIO_STATUS_BLOCK = ctypes.POINTER(IO_STATUS_BLOCK)
self.ntdll.NtQueryInformationFile.restype = self.NTSTATUS
self.ntdll.NtQueryInformationFile.argtypes = (
wintypes.HANDLE, # In FileHandle
PIO_STATUS_BLOCK, # Out IoStatusBlock
wintypes.LPVOID, # Out FileInformation
wintypes.ULONG, # In Length
self.FILE_INFORMATION_CLASS) # In FileInformationClass
# system call to retrieve list of PIDs currently using the file
status = self.ntdll.NtQueryInformationFile(hFile, ctypes.byref(iosb),
ctypes.byref(info),
ctypes.sizeof(info),
self.FileProcessIdsUsingFileInformation)
pidList = info.ProcessIdList[0:info.NumberOfProcessIdsInList]
if len(pidList) > 0:
return pidList
return None
def isAdmin(self):
if os.name=='nt':
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
else:
return os.getuid() == 0 # For unix like systems
def runMeAsAdmin(self):
if self.isAdmin() == True:
return
if os.name=='nt':
# Below is a Windows only method which does NOT popup a console.
import win32com.shell.shell as shell # Requires: pip install pywin32
script = os.path.abspath(sys.argv[0])
params = ' '.join([script] + sys.argv[1:])
shell.ShellExecuteEx(lpVerb='runas', lpFile=sys.executable, lpParameters=params)
sys.exit(0)
else:
from elevate import elevate # Requires: pip install elevate
elevate()
def getPidExeFileName(self, pid): # Requires running with admin privileges.
import win32api, win32con, win32process
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, False, pid) #get handle for the pid
filename = win32process.GetModuleFileNameEx(handle, 0) #get exe path & filename for handle
return filename
def getFilesOpen(self, pid:int): # Requires running with admin privileges.
p = psutil.Process(pid1)
return p.open_files()
def getFileHandle(self, pid, path = None): # Requires running with admin privileges.
if path == None:
path = self.lastPath
args = f"{self.handleExe} -p {pid} -nobanner"
# if self.stash != None: self.stash.Log(args)
results = os.popen(args).read()
results = results.splitlines()
# if self.stash != None: self.stash.Log(results)
hdls = []
for line in results:
# if self.stash != None: self.stash.Log(line)
if line.endswith(path):
epos = line.find(":")
if epos > 0:
hdls += [line[0:epos]]
else:
break
if len(hdls) == 0:
return None
return hdls
def closeHandle(self, pid, fileHandle): # Requires running with admin privileges.
args = f"{self.handleExe} -p {pid} -c {fileHandle} -y -nobanner"
if self.stash != None: self.stash.Log(args)
results = os.popen(args).read()
results = results.strip("\n")
if results.endswith("Handle closed."):
return True
if self.stash != None: self.stash.Error(f"Could not close pid {pid} file handle {fileHandle}; results={results}")
return False
def closeFile(self, path): # Requires running with admin privileges.
pids = self.getPid(path)
if pids == None:
return None
# if self.stash != None: self.stash.Log(f"pids={pids}")
results = []
# Need admin privileges starting here.
self.runMeAsAdmin()
for pid in pids:
hdls = self.getFileHandle(pid, path)
if hdls == None:
# if self.stash != None: self.stash.Log(f"No handle for pid {pid}")
continue
else:
for hdl in hdls:
# if self.stash != None: self.stash.Log(f"pid {pid} hdl={hdl}")
results += [self.closeHandle(pid, hdl)]
if len(results) == 0:
return None
return {"results" : results, "pids" : pids}