Browse Source

Correct Direct Unpack locking behavior for multisets

tags/2.2.0Alpha3
Safihre 8 years ago
parent
commit
232d537d23
  1. 51
      sabnzbd/directunpacker.py

51
sabnzbd/directunpacker.py

@ -73,15 +73,19 @@ class DirectUnpacker(threading.Thread):
def save(self): def save(self):
pass pass
def release_concurrent_lock(self):
""" Let other unpackers go """
try:
CONCURRENT_LOCK.release()
except:
pass
def reset_active(self): def reset_active(self):
self.active_instance = None self.active_instance = None
self.cur_setname = None self.cur_setname = None
self.cur_volume = 0 self.cur_volume = 0
# Release lock to be sure # Release lock to be sure
try: self.release_concurrent_lock()
CONCURRENT_LOCK.release()
except:
pass
def check_requirements(self): def check_requirements(self):
if self.killed or not self.nzo.unpack or cfg.direct_unpack() < 1 or sabnzbd.newsunpack.RAR_PROBLEM: if self.killed or not self.nzo.unpack or cfg.direct_unpack() < 1 or sabnzbd.newsunpack.RAR_PROBLEM:
@ -126,9 +130,9 @@ class DirectUnpacker(threading.Thread):
# Are we doing this set? # Are we doing this set?
if self.cur_setname == nzf.setname: if self.cur_setname == nzf.setname:
logging.debug('Queued %s for %s', nzf.filename, self.cur_setname) logging.debug('DirectUnpack queued %s for %s', nzf.filename, self.cur_setname)
# Is this the first one? # Is this the first one of the first set?
if not self.active_instance and self.have_next_volume(): if not self.active_instance and not self.is_alive() and self.have_next_volume():
# Too many runners already? # Too many runners already?
if len(ACTIVE_UNPACKERS) >= MAX_ACTIVE_UNPACKERS: if len(ACTIVE_UNPACKERS) >= MAX_ACTIVE_UNPACKERS:
logging.info('Too many DirectUnpackers currently to start %s', self.cur_setname) logging.info('Too many DirectUnpackers currently to start %s', self.cur_setname)
@ -178,6 +182,9 @@ class DirectUnpacker(threading.Thread):
self.success_sets.append(self.cur_setname) self.success_sets.append(self.cur_setname)
logging.info('DirectUnpack completed for %s', self.cur_setname) logging.info('DirectUnpack completed for %s', self.cur_setname)
# Make sure to release the lock
self.release_concurrent_lock()
# Are there more files left? # Are there more files left?
if self.nzo.files: if self.nzo.files:
with self.next_file_lock: with self.next_file_lock:
@ -195,22 +202,18 @@ class DirectUnpacker(threading.Thread):
nzf = self.next_sets.pop(0) nzf = self.next_sets.pop(0)
self.reset_active() self.reset_active()
self.cur_setname = nzf.setname self.cur_setname = nzf.setname
# Wait for the 1st volume to appear
self.wait_for_next_volume()
self.create_unrar_instance(nzf) self.create_unrar_instance(nzf)
else: else:
break break
if linebuf.endswith('[C]ontinue, [Q]uit '): if linebuf.endswith('[C]ontinue, [Q]uit '):
# Next one can go now # Next one can go now
try: self.release_concurrent_lock()
CONCURRENT_LOCK.release()
except: # Wait for the next one..
pass self.wait_for_next_volume()
# Wait for the volume to appear
# But stop if it was killed or the NZB is done
while not self.have_next_volume() and not self.killed and self.nzo.files:
with self.next_file_lock:
self.next_file_lock.wait()
# Send "Enter" to proceed, only 1 at a time via lock # Send "Enter" to proceed, only 1 at a time via lock
CONCURRENT_LOCK.acquire() CONCURRENT_LOCK.acquire()
@ -240,10 +243,7 @@ class DirectUnpacker(threading.Thread):
ACTIVE_UNPACKERS.remove(self) ACTIVE_UNPACKERS.remove(self)
# Make sure to release the lock # Make sure to release the lock
try: self.release_concurrent_lock()
CONCURRENT_LOCK.release()
except:
pass
def have_next_volume(self): def have_next_volume(self):
""" Check if next volume of set is available, start """ Check if next volume of set is available, start
@ -253,6 +253,13 @@ class DirectUnpacker(threading.Thread):
return True return True
return False return False
def wait_for_next_volume(self):
""" Wait for the correct volume to appear
But stop if it was killed or the NZB is done """
while not self.have_next_volume() and not self.killed and self.nzo.files:
with self.next_file_lock:
self.next_file_lock.wait()
def create_unrar_instance(self, rarfile_nzf): def create_unrar_instance(self, rarfile_nzf):
""" Start the unrar instance using the user's options """ """ Start the unrar instance using the user's options """
# Generate extraction path and save for post-proc # Generate extraction path and save for post-proc
@ -291,6 +298,8 @@ class DirectUnpacker(threading.Thread):
stup, need_shell, command, creationflags = build_command(command) stup, need_shell, command, creationflags = build_command(command)
logging.debug('Running unrar for DirectUnpack %s', command) logging.debug('Running unrar for DirectUnpack %s', command)
# Aquire lock and go
self.active_instance = Popen(command, shell=need_shell, stdin=subprocess.PIPE, self.active_instance = Popen(command, shell=need_shell, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
startupinfo=stup, creationflags=creationflags) startupinfo=stup, creationflags=creationflags)

Loading…
Cancel
Save