Edgewall Software

source: plugins/1.0/spam-filter/tracspamfilter/filters/akismet.py

Last change on this file was 15643, checked in by Ryan J Ollos, 7 years ago

1.0.11dev: Fix UnicodeEncodeError submitting content to Akismet

Refs #12715.

  • Property svn:eol-style set to native
File size: 6.0 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2005-2006 Edgewall Software
4# Copyright (C) 2005-2006 Matthew Good <trac@matt-good.net>
5# Copyright (C) 2006 Christopher Lenz <cmlenz@gmx.de>
6# All rights reserved.
7#
8# This software is licensed as described in the file COPYING, which
9# you should have received as part of this distribution. The terms
10# are also available at http://trac.edgewall.com/license.html.
11#
12# This software consists of voluntary contributions made by many
13# individuals. For the exact contribution history, see the revision
14# history and logs, available at http://projects.edgewall.com/trac/.
15#
16# Author: Matthew Good <trac@matt-good.net>
17# Christopher Lenz <cmlenz@gmx.de>
18
19import urllib2
20from email.Utils import parseaddr
21from pkg_resources import get_distribution
22
23from trac import __version__ as TRAC_VERSION
24from trac.config import IntOption, Option
25from trac.core import Component, implements
26from trac.mimeview.api import is_binary
27from trac.util.text import unicode_urlencode
28
29from tracspamfilter.api import IFilterStrategy, N_
30
31
32class AkismetFilterStrategy(Component):
33 """Spam filter using the Akismet service (http://akismet.com/).
34
35 Based on the `akismet` Python module written by Michael Ford:
36 http://www.voidspace.org.uk/python/modules.shtml#akismet
37 """
38 implements(IFilterStrategy)
39
40 noheaders = ['HTTP_COOKIE', 'HTTP_HOST', 'HTTP_REFERER',
41 'HTTP_USER_AGENT', 'HTTP_AUTHORIZATION']
42
43 karma_points = IntOption('spam-filter', 'akismet_karma', '10',
44 """By how many points an Akismet reject impacts the overall karma of
45 a submission.""", doc_domain='tracspamfilter')
46
47 api_key = Option('spam-filter', 'akismet_api_key', '',
48 """Wordpress key required to use the Akismet API.""",
49 doc_domain='tracspamfilter')
50
51 api_url = Option('spam-filter', 'akismet_api_url',
52 'rest.akismet.com/1.1/', """URL of the Akismet service.""",
53 doc_domain='tracspamfilter')
54
55 user_agent = 'Trac/%s | SpamFilter/%s' % (
56 TRAC_VERSION, get_distribution('TracSpamFilter').version)
57
58 def __init__(self):
59 self.verified_key = None
60 self.verified = None
61
62 # IFilterStrategy methods
63
64 def is_external(self):
65 return True
66
67 def test(self, req, author, content, ip):
68 if not self._check_preconditions(req, author, content):
69 return
70
71 try:
72 url = 'http://%s.%scomment-check' % (self.api_key, self.api_url)
73 self.log.debug("Checking content with Akismet service at %s", url)
74 resp = self._post(url, req, author, content, ip)
75 if resp.strip().lower() != 'false':
76 self.log.debug("Akismet says content is spam")
77 return -abs(self.karma_points), \
78 N_("Akismet says content is spam")
79
80 except urllib2.URLError, e:
81 self.log.warn("Akismet request failed (%s)", e)
82
83 def train(self, req, author, content, ip, spam=True):
84 if not self._check_preconditions(req, author, content):
85 return -2
86
87 try:
88 which = spam and 'spam' or 'ham'
89 url = 'http://%s.%ssubmit-%s' \
90 % (self.api_key, self.api_url, which)
91 self.log.debug("Submitting %s to Akismet service at %s",
92 which, url)
93 self._post(url, req, author, content, ip)
94 return 1
95 except urllib2.URLError, e:
96 self.log.warn("Akismet request failed (%s)", e)
97 return -1
98
99 # Internal methods
100
101 def _check_preconditions(self, req, author, content):
102 if self.karma_points == 0:
103 return False
104
105 if not self.api_key:
106 self.log.warning("Akismet API key is missing")
107 return False
108
109 if is_binary(content):
110 self.log.warning("Content is binary, Akismet content check "
111 "skipped")
112 return False
113
114 try:
115 if not self.verify_key(req):
116 self.log.warning("Akismet API key is invalid")
117 return False
118 return True
119 except urllib2.URLError, e:
120 self.log.warn("Akismet request failed (%s)", e)
121
122 def verify_key(self, req, api_url=None, api_key=None):
123 if api_url is None:
124 api_url = self.api_url
125 if api_key is None:
126 api_key = self.api_key
127
128 if api_key != self.verified_key:
129 self.log.debug("Verifying Akismet API key")
130 params = {'blog': req.base_url, 'key': api_key}
131 req = urllib2.Request('http://%sverify-key' % api_url,
132 unicode_urlencode(params),
133 {'User-Agent': self.user_agent})
134 resp = urllib2.urlopen(req).read()
135 if resp.strip().lower() == 'valid':
136 self.log.debug("Akismet API key is valid")
137 self.verified = True
138 self.verified_key = api_key
139
140 return self.verified_key is not None
141
142 def _post(self, url, req, author, content, ip):
143 # Split up author into name and email, if possible
144 author_name, author_email = parseaddr(author)
145 if not author_name and not author_email:
146 author_name = author
147 elif not author_name and author_email.find('@') < 1:
148 author_name = author
149 author_email = None
150
151 params = {
152 'blog': req.base_url,
153 'user_ip': ip,
154 'user_agent': req.get_header('User-Agent'),
155 'referrer': req.get_header('Referer') or 'unknown',
156 'comment_author': author_name,
157 'comment_type': 'trac',
158 'comment_content': content
159 }
160 if author_email:
161 params['comment_author_email'] = author_email
162 for k, v in req.environ.items():
163 if k.startswith('HTTP_') and k not in self.noheaders:
164 params[k] = v
165 urlreq = urllib2.Request(url, unicode_urlencode(params),
166 {'User-Agent': self.user_agent})
167
168 resp = urllib2.urlopen(urlreq)
169 return resp.read()
Note: See TracBrowser for help on using the repository browser.