5 changed files with 724 additions and 137 deletions
@ -1,111 +0,0 @@ |
|||
# Based on netprowl by the following authors. |
|||
|
|||
# Altered 1st October 2010 - Tim Child. |
|||
# Have added the ability for the command line arguments to take a password. |
|||
|
|||
# Altered 1-17-2010 - Tanner Stokes - www.tannr.com |
|||
# Added support for command line arguments |
|||
|
|||
# ORIGINAL CREDITS |
|||
# """Growl 0.6 Network Protocol Client for Python""" |
|||
# __version__ = "0.6.3" |
|||
# __author__ = "Rui Carmo (http://the.taoofmac.com)" |
|||
# __copyright__ = "(C) 2004 Rui Carmo. Code under BSD License." |
|||
# __contributors__ = "Ingmar J Stein (Growl Team), John Morrissey (hashlib patch)" |
|||
|
|||
import struct |
|||
|
|||
try: |
|||
import hashlib |
|||
md5_constructor = hashlib.md5 |
|||
except ImportError: |
|||
import md5 |
|||
md5_constructor = md5.new |
|||
|
|||
GROWL_UDP_PORT = 9887 |
|||
GROWL_PROTOCOL_VERSION = 1 |
|||
GROWL_TYPE_REGISTRATION = 0 |
|||
GROWL_TYPE_NOTIFICATION = 1 |
|||
|
|||
|
|||
class GrowlRegistrationPacket(object): |
|||
"""Builds a Growl Network Registration packet. |
|||
Defaults to emulating the command-line growlnotify utility.""" |
|||
|
|||
def __init__(self, application = "CouchPotato", password = None): |
|||
self.notifications = [] |
|||
self.defaults = [] # array of indexes into notifications |
|||
self.application = application.encode("utf-8") |
|||
self.password = password |
|||
|
|||
def addNotification(self, notification = "General Notification", enabled = True): |
|||
"""Adds a notification type and sets whether it is enabled on the GUI""" |
|||
|
|||
self.notifications.append(notification) |
|||
if enabled: |
|||
self.defaults.append(len(self.notifications) - 1) |
|||
|
|||
def payload(self): |
|||
"""Returns the packet payload.""" |
|||
self.data = struct.pack("!BBH", |
|||
GROWL_PROTOCOL_VERSION, |
|||
GROWL_TYPE_REGISTRATION, |
|||
len(self.application) |
|||
) |
|||
self.data += struct.pack("BB", |
|||
len(self.notifications), |
|||
len(self.defaults) |
|||
) |
|||
self.data += self.application |
|||
for notification in self.notifications: |
|||
encoded = notification.encode("utf-8") |
|||
self.data += struct.pack("!H", len(encoded)) |
|||
self.data += encoded |
|||
for default in self.defaults: |
|||
self.data += struct.pack("B", default) |
|||
self.checksum = md5_constructor() |
|||
self.checksum.update(self.data) |
|||
if self.password: |
|||
self.checksum.update(self.password) |
|||
self.data += self.checksum.digest() |
|||
return self.data |
|||
|
|||
class GrowlNotificationPacket(object): |
|||
"""Builds a Growl Network Notification packet. |
|||
Defaults to emulating the command-line growlnotify utility.""" |
|||
|
|||
def __init__(self, application = "CouchPotato", |
|||
notification = "General Notification", title = "Title", |
|||
description = "Description", priority = 0, sticky = False, password = None): |
|||
|
|||
self.application = application.encode("utf-8") |
|||
self.notification = notification.encode("utf-8") |
|||
self.title = title.encode("utf-8") |
|||
self.description = description.encode("utf-8") |
|||
flags = (priority & 0x07) * 2 |
|||
if priority < 0: |
|||
flags |= 0x08 |
|||
if sticky: |
|||
flags = flags | 0x0100 |
|||
self.data = struct.pack("!BBHHHHH", |
|||
GROWL_PROTOCOL_VERSION, |
|||
GROWL_TYPE_NOTIFICATION, |
|||
flags, |
|||
len(self.notification), |
|||
len(self.title), |
|||
len(self.description), |
|||
len(self.application) |
|||
) |
|||
self.data += self.notification |
|||
self.data += self.title |
|||
self.data += self.description |
|||
self.data += self.application |
|||
self.checksum = md5_constructor() |
|||
self.checksum.update(self.data) |
|||
if password: |
|||
self.checksum.update(password) |
|||
self.data += self.checksum.digest() |
|||
|
|||
def payload(self): |
|||
"""Returns the packet payload.""" |
|||
return self.data |
@ -1,31 +1,46 @@ |
|||
from couchpotato.core.logger import CPLog |
|||
from couchpotato.core.notifications.base import Notification |
|||
from couchpotato.core.notifications.growl.growl import GROWL_UDP_PORT, \ |
|||
GrowlRegistrationPacket, GrowlNotificationPacket |
|||
from socket import AF_INET, SOCK_DGRAM, socket |
|||
from gntp import notifier |
|||
import logging |
|||
|
|||
log = CPLog(__name__) |
|||
|
|||
|
|||
class Growl(Notification): |
|||
|
|||
def notify(self, type = '', message = '', data = {}): |
|||
if self.isDisabled(): return |
|||
def __init__(self): |
|||
super(Growl, self).__init__() |
|||
|
|||
hosts = [x.strip() for x in self.conf('host').split(",")] |
|||
password = self.conf('password') |
|||
logger = logging.getLogger('gntp.notifier') |
|||
logger.disabled = True |
|||
|
|||
for curHost in hosts: |
|||
addr = (curHost, GROWL_UDP_PORT) |
|||
try: |
|||
self.growl = notifier.GrowlNotifier( |
|||
applicationName = 'CouchPotato', |
|||
notifications = ["Updates"], |
|||
defaultNotifications = ["Updates"], |
|||
applicationIcon = 'http://couchpotatoapp.com/media/images/couch.png', |
|||
) |
|||
self.growl.register() |
|||
except: |
|||
pass |
|||
|
|||
s = socket(AF_INET, SOCK_DGRAM) |
|||
p = GrowlRegistrationPacket(password = password) |
|||
p.addNotification() |
|||
s.sendto(p.payload(), addr) |
|||
def notify(self, type = '', message = '', data = {}): |
|||
if self.isDisabled(): return |
|||
|
|||
# send notification |
|||
p = GrowlNotificationPacket(title = self.default_title, description = message, priority = 0, sticky = False, password = password) |
|||
s.sendto(p.payload(), addr) |
|||
s.close() |
|||
try: |
|||
self.growl.notify( |
|||
noteType = "Updates", |
|||
title = self.default_title, |
|||
description = message, |
|||
sticky = False, |
|||
priority = 1, |
|||
) |
|||
|
|||
log.info('Growl notifications sent.') |
|||
return True |
|||
except: |
|||
log.error('Failed growl notification.') |
|||
|
|||
return False |
|||
|
|||
|
@ -0,0 +1,483 @@ |
|||
import re |
|||
import hashlib |
|||
import time |
|||
|
|||
__version__ = '0.5' |
|||
|
|||
#GNTP/<version> <messagetype> <encryptionAlgorithmID>[:<ivValue>][ <keyHashAlgorithmID>:<keyHash>.<salt>] |
|||
GNTP_INFO_LINE = re.compile( |
|||
'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)' + |
|||
' (?P<encryptionAlgorithmID>[A-Z0-9]+(:(?P<ivValue>[A-F0-9]+))?) ?' + |
|||
'((?P<keyHashAlgorithmID>[A-Z0-9]+):(?P<keyHash>[A-F0-9]+).(?P<salt>[A-F0-9]+))?\r\n', |
|||
re.IGNORECASE |
|||
) |
|||
|
|||
GNTP_INFO_LINE_SHORT = re.compile( |
|||
'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)', |
|||
re.IGNORECASE |
|||
) |
|||
|
|||
GNTP_HEADER = re.compile('([\w-]+):(.+)') |
|||
|
|||
GNTP_EOL = u'\r\n' |
|||
|
|||
|
|||
class BaseError(Exception): |
|||
def gntp_error(self): |
|||
error = GNTPError(self.errorcode, self.errordesc) |
|||
return error.encode() |
|||
|
|||
|
|||
class ParseError(BaseError): |
|||
errorcode = 500 |
|||
errordesc = 'Error parsing the message' |
|||
|
|||
|
|||
class AuthError(BaseError): |
|||
errorcode = 400 |
|||
errordesc = 'Error with authorization' |
|||
|
|||
|
|||
class UnsupportedError(BaseError): |
|||
errorcode = 500 |
|||
errordesc = 'Currently unsupported by gntp.py' |
|||
|
|||
|
|||
class _GNTPBase(object): |
|||
"""Base initilization |
|||
|
|||
:param string messagetype: GNTP Message type |
|||
:param string version: GNTP Protocol version |
|||
:param string encription: Encryption protocol |
|||
""" |
|||
def __init__(self, messagetype=None, version='1.0', encryption=None): |
|||
self.info = { |
|||
'version': version, |
|||
'messagetype': messagetype, |
|||
'encryptionAlgorithmID': encryption |
|||
} |
|||
self.headers = {} |
|||
self.resources = {} |
|||
|
|||
def __str__(self): |
|||
return self.encode() |
|||
|
|||
def _parse_info(self, data): |
|||
"""Parse the first line of a GNTP message to get security and other info values |
|||
|
|||
:param string data: GNTP Message |
|||
:return dict: Parsed GNTP Info line |
|||
""" |
|||
|
|||
match = GNTP_INFO_LINE.match(data) |
|||
|
|||
if not match: |
|||
raise ParseError('ERROR_PARSING_INFO_LINE') |
|||
|
|||
info = match.groupdict() |
|||
if info['encryptionAlgorithmID'] == 'NONE': |
|||
info['encryptionAlgorithmID'] = None |
|||
|
|||
return info |
|||
|
|||
def set_password(self, password, encryptAlgo='MD5'): |
|||
"""Set a password for a GNTP Message |
|||
|
|||
:param string password: Null to clear password |
|||
:param string encryptAlgo: Supports MD5, SHA1, SHA256, SHA512 |
|||
""" |
|||
hash = { |
|||
'MD5': hashlib.md5, |
|||
'SHA1': hashlib.sha1, |
|||
'SHA256': hashlib.sha256, |
|||
'SHA512': hashlib.sha512, |
|||
} |
|||
|
|||
self.password = password |
|||
self.encryptAlgo = encryptAlgo.upper() |
|||
if not password: |
|||
self.info['encryptionAlgorithmID'] = None |
|||
self.info['keyHashAlgorithm'] = None |
|||
return |
|||
if not self.encryptAlgo in hash.keys(): |
|||
raise UnsupportedError('INVALID HASH "%s"' % self.encryptAlgo) |
|||
|
|||
hashfunction = hash.get(self.encryptAlgo) |
|||
|
|||
password = password.encode('utf8') |
|||
seed = time.ctime() |
|||
salt = hashfunction(seed).hexdigest() |
|||
saltHash = hashfunction(seed).digest() |
|||
keyBasis = password + saltHash |
|||
key = hashfunction(keyBasis).digest() |
|||
keyHash = hashfunction(key).hexdigest() |
|||
|
|||
self.info['keyHashAlgorithmID'] = self.encryptAlgo |
|||
self.info['keyHash'] = keyHash.upper() |
|||
self.info['salt'] = salt.upper() |
|||
|
|||
def _decode_hex(self, value): |
|||
"""Helper function to decode hex string to `proper` hex string |
|||
|
|||
:param string value: Human readable hex string |
|||
:return string: Hex string |
|||
""" |
|||
result = '' |
|||
for i in range(0, len(value), 2): |
|||
tmp = int(value[i:i + 2], 16) |
|||
result += chr(tmp) |
|||
return result |
|||
|
|||
def _decode_binary(self, rawIdentifier, identifier): |
|||
rawIdentifier += '\r\n\r\n' |
|||
dataLength = int(identifier['Length']) |
|||
pointerStart = self.raw.find(rawIdentifier) + len(rawIdentifier) |
|||
pointerEnd = pointerStart + dataLength |
|||
data = self.raw[pointerStart:pointerEnd] |
|||
if not len(data) == dataLength: |
|||
raise ParseError('INVALID_DATA_LENGTH Expected: %s Recieved %s' % (dataLength, len(data))) |
|||
return data |
|||
|
|||
def _validate_password(self, password): |
|||
"""Validate GNTP Message against stored password""" |
|||
self.password = password |
|||
if password == None: |
|||
raise AuthError('Missing password') |
|||
keyHash = self.info.get('keyHash', None) |
|||
if keyHash is None and self.password is None: |
|||
return True |
|||
if keyHash is None: |
|||
raise AuthError('Invalid keyHash') |
|||
if self.password is None: |
|||
raise AuthError('Missing password') |
|||
|
|||
password = self.password.encode('utf8') |
|||
saltHash = self._decode_hex(self.info['salt']) |
|||
|
|||
keyBasis = password + saltHash |
|||
key = hashlib.md5(keyBasis).digest() |
|||
keyHash = hashlib.md5(key).hexdigest() |
|||
|
|||
if not keyHash.upper() == self.info['keyHash'].upper(): |
|||
raise AuthError('Invalid Hash') |
|||
return True |
|||
|
|||
def validate(self): |
|||
"""Verify required headers""" |
|||
for header in self._requiredHeaders: |
|||
if not self.headers.get(header, False): |
|||
raise ParseError('Missing Notification Header: ' + header) |
|||
|
|||
def _format_info(self): |
|||
"""Generate info line for GNTP Message |
|||
|
|||
:return string: |
|||
""" |
|||
info = u'GNTP/%s %s' % ( |
|||
self.info.get('version'), |
|||
self.info.get('messagetype'), |
|||
) |
|||
if self.info.get('encryptionAlgorithmID', None): |
|||
info += ' %s:%s' % ( |
|||
self.info.get('encryptionAlgorithmID'), |
|||
self.info.get('ivValue'), |
|||
) |
|||
else: |
|||
info += ' NONE' |
|||
|
|||
if self.info.get('keyHashAlgorithmID', None): |
|||
info += ' %s:%s.%s' % ( |
|||
self.info.get('keyHashAlgorithmID'), |
|||
self.info.get('keyHash'), |
|||
self.info.get('salt') |
|||
) |
|||
|
|||
return info |
|||
|
|||
def _parse_dict(self, data): |
|||
"""Helper function to parse blocks of GNTP headers into a dictionary |
|||
|
|||
:param string data: |
|||
:return dict: |
|||
""" |
|||
dict = {} |
|||
for line in data.split('\r\n'): |
|||
match = GNTP_HEADER.match(line) |
|||
if not match: |
|||
continue |
|||
|
|||
key = match.group(1).strip() |
|||
val = match.group(2).strip() |
|||
dict[key] = val |
|||
return dict |
|||
|
|||
def add_header(self, key, value): |
|||
if isinstance(value, unicode): |
|||
self.headers[key] = value |
|||
else: |
|||
self.headers[key] = unicode('%s' % value, 'utf8', 'replace') |
|||
|
|||
def decode(self, data, password=None): |
|||
"""Decode GNTP Message |
|||
|
|||
:param string data: |
|||
""" |
|||
self.password = password |
|||
self.raw = data |
|||
parts = self.raw.split('\r\n\r\n') |
|||
self.info = self._parse_info(data) |
|||
self.headers = self._parse_dict(parts[0]) |
|||
|
|||
def encode(self): |
|||
"""Encode a GNTP Message |
|||
|
|||
:return string: Encoded GNTP Message ready to be sent |
|||
""" |
|||
self.validate() |
|||
|
|||
message = self._format_info() + GNTP_EOL |
|||
#Headers |
|||
for k, v in self.headers.iteritems(): |
|||
message += u'%s: %s%s' % (k, v, GNTP_EOL) |
|||
|
|||
message += GNTP_EOL |
|||
return message |
|||
|
|||
|
|||
class GNTPRegister(_GNTPBase): |
|||
"""Represents a GNTP Registration Command |
|||
|
|||
:param string data: (Optional) See decode() |
|||
:param string password: (Optional) Password to use while encoding/decoding messages |
|||
""" |
|||
_requiredHeaders = [ |
|||
'Application-Name', |
|||
'Notifications-Count' |
|||
] |
|||
_requiredNotificationHeaders = ['Notification-Name'] |
|||
|
|||
def __init__(self, data=None, password=None): |
|||
_GNTPBase.__init__(self, 'REGISTER') |
|||
self.notifications = [] |
|||
|
|||
if data: |
|||
self.decode(data, password) |
|||
else: |
|||
self.set_password(password) |
|||
self.add_header('Application-Name', 'pygntp') |
|||
self.add_header('Notifications-Count', 0) |
|||
|
|||
def validate(self): |
|||
'''Validate required headers and validate notification headers''' |
|||
for header in self._requiredHeaders: |
|||
if not self.headers.get(header, False): |
|||
raise ParseError('Missing Registration Header: ' + header) |
|||
for notice in self.notifications: |
|||
for header in self._requiredNotificationHeaders: |
|||
if not notice.get(header, False): |
|||
raise ParseError('Missing Notification Header: ' + header) |
|||
|
|||
def decode(self, data, password): |
|||
"""Decode existing GNTP Registration message |
|||
|
|||
:param string data: Message to decode |
|||
""" |
|||
self.raw = data |
|||
parts = self.raw.split('\r\n\r\n') |
|||
self.info = self._parse_info(data) |
|||
self._validate_password(password) |
|||
self.headers = self._parse_dict(parts[0]) |
|||
|
|||
for i, part in enumerate(parts): |
|||
if i == 0: |
|||
continue # Skip Header |
|||
if part.strip() == '': |
|||
continue |
|||
notice = self._parse_dict(part) |
|||
if notice.get('Notification-Name', False): |
|||
self.notifications.append(notice) |
|||
elif notice.get('Identifier', False): |
|||
notice['Data'] = self._decode_binary(part, notice) |
|||
#open('register.png','wblol').write(notice['Data']) |
|||
self.resources[notice.get('Identifier')] = notice |
|||
|
|||
def add_notification(self, name, enabled=True): |
|||
"""Add new Notification to Registration message |
|||
|
|||
:param string name: Notification Name |
|||
:param boolean enabled: Enable this notification by default |
|||
""" |
|||
notice = {} |
|||
notice['Notification-Name'] = u'%s' % name |
|||
notice['Notification-Enabled'] = u'%s' % enabled |
|||
|
|||
self.notifications.append(notice) |
|||
self.add_header('Notifications-Count', len(self.notifications)) |
|||
|
|||
def encode(self): |
|||
"""Encode a GNTP Registration Message |
|||
|
|||
:return string: Encoded GNTP Registration message |
|||
""" |
|||
self.validate() |
|||
|
|||
message = self._format_info() + GNTP_EOL |
|||
#Headers |
|||
for k, v in self.headers.iteritems(): |
|||
message += u'%s: %s%s' % (k, v, GNTP_EOL) |
|||
|
|||
#Notifications |
|||
if len(self.notifications) > 0: |
|||
for notice in self.notifications: |
|||
message += GNTP_EOL |
|||
for k, v in notice.iteritems(): |
|||
message += u'%s: %s%s' % (k, v, GNTP_EOL) |
|||
|
|||
message += GNTP_EOL |
|||
return message |
|||
|
|||
|
|||
class GNTPNotice(_GNTPBase): |
|||
"""Represents a GNTP Notification Command |
|||
|
|||
:param string data: (Optional) See decode() |
|||
:param string app: (Optional) Set Application-Name |
|||
:param string name: (Optional) Set Notification-Name |
|||
:param string title: (Optional) Set Notification Title |
|||
:param string password: (Optional) Password to use while encoding/decoding messages |
|||
""" |
|||
_requiredHeaders = [ |
|||
'Application-Name', |
|||
'Notification-Name', |
|||
'Notification-Title' |
|||
] |
|||
|
|||
def __init__(self, data=None, app=None, name=None, title=None, password=None): |
|||
_GNTPBase.__init__(self, 'NOTIFY') |
|||
|
|||
if data: |
|||
self.decode(data, password) |
|||
else: |
|||
self.set_password(password) |
|||
if app: |
|||
self.add_header('Application-Name', app) |
|||
if name: |
|||
self.add_header('Notification-Name', name) |
|||
if title: |
|||
self.add_header('Notification-Title', title) |
|||
|
|||
def decode(self, data, password): |
|||
"""Decode existing GNTP Notification message |
|||
|
|||
:param string data: Message to decode. |
|||
""" |
|||
self.raw = data |
|||
parts = self.raw.split('\r\n\r\n') |
|||
self.info = self._parse_info(data) |
|||
self._validate_password(password) |
|||
self.headers = self._parse_dict(parts[0]) |
|||
|
|||
for i, part in enumerate(parts): |
|||
if i == 0: |
|||
continue # Skip Header |
|||
if part.strip() == '': |
|||
continue |
|||
notice = self._parse_dict(part) |
|||
if notice.get('Identifier', False): |
|||
notice['Data'] = self._decode_binary(part, notice) |
|||
#open('notice.png','wblol').write(notice['Data']) |
|||
self.resources[notice.get('Identifier')] = notice |
|||
|
|||
def encode(self): |
|||
"""Encode a GNTP Notification Message |
|||
|
|||
:return string: GNTP Notification Message ready to be sent |
|||
""" |
|||
self.validate() |
|||
|
|||
message = self._format_info() + GNTP_EOL |
|||
#Headers |
|||
for k, v in self.headers.iteritems(): |
|||
message += u'%s: %s%s' % (k, v, GNTP_EOL) |
|||
|
|||
message += GNTP_EOL |
|||
return message |
|||
|
|||
|
|||
class GNTPSubscribe(_GNTPBase): |
|||
"""Represents a GNTP Subscribe Command |
|||
|
|||
:param string data: (Optional) See decode() |
|||
:param string password: (Optional) Password to use while encoding/decoding messages |
|||
""" |
|||
_requiredHeaders = [ |
|||
'Subscriber-ID', |
|||
'Subscriber-Name', |
|||
] |
|||
|
|||
def __init__(self, data=None, password=None): |
|||
_GNTPBase.__init__(self, 'SUBSCRIBE') |
|||
if data: |
|||
self.decode(data, password) |
|||
else: |
|||
self.set_password(password) |
|||
|
|||
|
|||
class GNTPOK(_GNTPBase): |
|||
"""Represents a GNTP OK Response |
|||
|
|||
:param string data: (Optional) See _GNTPResponse.decode() |
|||
:param string action: (Optional) Set type of action the OK Response is for |
|||
""" |
|||
_requiredHeaders = ['Response-Action'] |
|||
|
|||
def __init__(self, data=None, action=None): |
|||
_GNTPBase.__init__(self, '-OK') |
|||
if data: |
|||
self.decode(data) |
|||
if action: |
|||
self.add_header('Response-Action', action) |
|||
|
|||
|
|||
class GNTPError(_GNTPBase): |
|||
"""Represents a GNTP Error response |
|||
|
|||
:param string data: (Optional) See _GNTPResponse.decode() |
|||
:param string errorcode: (Optional) Error code |
|||
:param string errordesc: (Optional) Error Description |
|||
""" |
|||
_requiredHeaders = ['Error-Code', 'Error-Description'] |
|||
|
|||
def __init__(self, data=None, errorcode=None, errordesc=None): |
|||
_GNTPBase.__init__(self, '-ERROR') |
|||
if data: |
|||
self.decode(data) |
|||
if errorcode: |
|||
self.add_header('Error-Code', errorcode) |
|||
self.add_header('Error-Description', errordesc) |
|||
|
|||
def error(self): |
|||
return self.headers['Error-Code'], self.headers['Error-Description'] |
|||
|
|||
|
|||
def parse_gntp(data, password=None): |
|||
"""Attempt to parse a message as a GNTP message |
|||
|
|||
:param string data: Message to be parsed |
|||
:param string password: Optional password to be used to verify the message |
|||
""" |
|||
match = GNTP_INFO_LINE_SHORT.match(data) |
|||
if not match: |
|||
raise ParseError('INVALID_GNTP_INFO') |
|||
info = match.groupdict() |
|||
if info['messagetype'] == 'REGISTER': |
|||
return GNTPRegister(data, password=password) |
|||
elif info['messagetype'] == 'NOTIFY': |
|||
return GNTPNotice(data, password=password) |
|||
elif info['messagetype'] == 'SUBSCRIBE': |
|||
return GNTPSubscribe(data, password=password) |
|||
elif info['messagetype'] == '-OK': |
|||
return GNTPOK(data) |
|||
elif info['messagetype'] == '-ERROR': |
|||
return GNTPError(data) |
|||
raise ParseError('INVALID_GNTP_MESSAGE') |
@ -0,0 +1,208 @@ |
|||
""" |
|||
The gntp.notifier module is provided as a simple way to send notifications |
|||
using GNTP |
|||
|
|||
.. note:: |
|||
This class is intended to mostly mirror the older Python bindings such |
|||
that you should be able to replace instances of the old bindings with |
|||
this class. |
|||
`Original Python bindings <http://code.google.com/p/growl/source/browse/Bindings/python/Growl.py>`_ |
|||
|
|||
""" |
|||
import gntp |
|||
import socket |
|||
import logging |
|||
import platform |
|||
|
|||
__all__ = [ |
|||
'mini', |
|||
'GrowlNotifier', |
|||
] |
|||
|
|||
logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
def mini(description, applicationName = 'PythonMini', noteType = "Message", |
|||
title = "Mini Message", applicationIcon = None, hostname = 'localhost', |
|||
password = None, port = 23053, sticky = False, priority = None): |
|||
"""Single notification function |
|||
|
|||
Simple notification function in one line. Has only one required parameter |
|||
and attempts to use reasonable defaults for everything else |
|||
:param string description: Notification message |
|||
""" |
|||
growl = GrowlNotifier( |
|||
applicationName = applicationName, |
|||
notifications = [noteType], |
|||
defaultNotifications = [noteType], |
|||
hostname = hostname, |
|||
password = password, |
|||
port = port, |
|||
) |
|||
result = growl.register() |
|||
if result is not True: |
|||
return result |
|||
|
|||
return growl.notify( |
|||
noteType = noteType, |
|||
title = title, |
|||
description = description, |
|||
icon = applicationIcon, |
|||
sticky = sticky, |
|||
priority = priority, |
|||
) |
|||
|
|||
|
|||
class GrowlNotifier(object): |
|||
"""Helper class to simplfy sending Growl messages |
|||
|
|||
:param string applicationName: Sending application name |
|||
:param list notification: List of valid notifications |
|||
:param list defaultNotifications: List of notifications that should be enabled |
|||
by default |
|||
:param string applicationIcon: Icon URL |
|||
:param string hostname: Remote host |
|||
:param integer port: Remote port |
|||
""" |
|||
|
|||
passwordHash = 'MD5' |
|||
|
|||
def __init__(self, applicationName = 'Python GNTP', notifications = [], |
|||
defaultNotifications = None, applicationIcon = None, hostname = 'localhost', |
|||
password = None, port = 23053): |
|||
|
|||
self.applicationName = applicationName |
|||
self.notifications = list(notifications) |
|||
if defaultNotifications: |
|||
self.defaultNotifications = list(defaultNotifications) |
|||
else: |
|||
self.defaultNotifications = self.notifications |
|||
self.applicationIcon = applicationIcon |
|||
|
|||
self.password = password |
|||
self.hostname = hostname |
|||
self.port = int(port) |
|||
|
|||
def _checkIcon(self, data): |
|||
''' |
|||
Check the icon to see if it's valid |
|||
@param data: |
|||
@todo Consider checking for a valid URL |
|||
''' |
|||
return data |
|||
|
|||
def register(self): |
|||
"""Send GNTP Registration |
|||
|
|||
.. warning:: |
|||
Before sending notifications to Growl, you need to have |
|||
sent a registration message at least once |
|||
""" |
|||
logger.info('Sending registration to %s:%s', self.hostname, self.port) |
|||
register = gntp.GNTPRegister() |
|||
register.add_header('Application-Name', self.applicationName) |
|||
for notification in self.notifications: |
|||
enabled = notification in self.defaultNotifications |
|||
register.add_notification(notification, enabled) |
|||
if self.applicationIcon: |
|||
register.add_header('Application-Icon', self.applicationIcon) |
|||
if self.password: |
|||
register.set_password(self.password, self.passwordHash) |
|||
self.add_origin_info(register) |
|||
self.register_hook(register) |
|||
return self._send('register', register) |
|||
|
|||
def notify(self, noteType, title, description, icon = None, sticky = False, priority = None): |
|||
"""Send a GNTP notifications |
|||
|
|||
.. warning:: |
|||
Must have registered with growl beforehand or messages will be ignored |
|||
|
|||
:param string noteType: One of the notification names registered earlier |
|||
:param string title: Notification title (usually displayed on the notification) |
|||
:param string description: The main content of the notification |
|||
:param string icon: Icon URL path |
|||
:param boolean sticky: Sticky notification |
|||
:param integer priority: Message priority level from -2 to 2 |
|||
""" |
|||
logger.info('Sending notification [%s] to %s:%s', noteType, self.hostname, self.port) |
|||
assert noteType in self.notifications |
|||
notice = gntp.GNTPNotice() |
|||
notice.add_header('Application-Name', self.applicationName) |
|||
notice.add_header('Notification-Name', noteType) |
|||
notice.add_header('Notification-Title', title) |
|||
if self.password: |
|||
notice.set_password(self.password, self.passwordHash) |
|||
if sticky: |
|||
notice.add_header('Notification-Sticky', sticky) |
|||
if priority: |
|||
notice.add_header('Notification-Priority', priority) |
|||
if icon: |
|||
notice.add_header('Notification-Icon', self._checkIcon(icon)) |
|||
if description: |
|||
notice.add_header('Notification-Text', description) |
|||
|
|||
self.add_origin_info(notice) |
|||
self.notify_hook(notice) |
|||
|
|||
return self._send('notify', notice) |
|||
|
|||
def subscribe(self, id, name, port): |
|||
"""Send a Subscribe request to a remote machine""" |
|||
sub = gntp.GNTPSubscribe() |
|||
sub.add_header('Subscriber-ID', id) |
|||
sub.add_header('Subscriber-Name', name) |
|||
sub.add_header('Subscriber-Port', port) |
|||
if self.password: |
|||
sub.set_password(self.password, self.passwordHash) |
|||
|
|||
self.add_origin_info(sub) |
|||
self.subscribe_hook(sub) |
|||
|
|||
return self._send('subscribe', sub) |
|||
|
|||
def add_origin_info(self, packet): |
|||
"""Add optional Origin headers to message""" |
|||
packet.add_header('Origin-Machine-Name', platform.node()) |
|||
packet.add_header('Origin-Software-Name', 'gntp.py') |
|||
packet.add_header('Origin-Software-Version', gntp.__version__) |
|||
packet.add_header('Origin-Platform-Name', platform.system()) |
|||
packet.add_header('Origin-Platform-Version', platform.platform()) |
|||
|
|||
def register_hook(self, packet): |
|||
pass |
|||
|
|||
def notify_hook(self, packet): |
|||
pass |
|||
|
|||
def subscribe_hook(self, packet): |
|||
pass |
|||
|
|||
def _send(self, type, packet): |
|||
"""Send the GNTP Packet""" |
|||
|
|||
data = packet.encode() |
|||
|
|||
logger.debug('To : %s:%s <%s>\n%s', self.hostname, self.port, packet.__class__, data) |
|||
|
|||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|||
s.connect((self.hostname, self.port)) |
|||
s.send(data.encode('utf8', 'replace')) |
|||
recv_data = s.recv(1024) |
|||
while not recv_data.endswith("\r\n\r\n"): |
|||
recv_data += s.recv(1024) |
|||
response = gntp.parse_gntp(recv_data) |
|||
s.close() |
|||
|
|||
logger.debug('From : %s:%s <%s>\n%s', self.hostname, self.port, response.__class__, response) |
|||
|
|||
if response.info['messagetype'] == '-OK': |
|||
return True |
|||
logger.error('Invalid response: %s', response.error()) |
|||
return response.error() |
|||
|
|||
if __name__ == '__main__': |
|||
# If we're running this module directly we're likely running it as a test |
|||
# so extra debugging is useful |
|||
logging.basicConfig(level = logging.INFO) |
|||
mini('Testing mini notification') |
Loading…
Reference in new issue