Edit file File name : openprinting.py Content :#!/usr/bin/python3 ## system-config-printer ## Copyright (C) 2008, 2011, 2014 Red Hat, Inc. ## Copyright (C) 2008 Till Kamppeter <till.kamppeter@gmail.com> ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import requests, urllib.request, urllib.parse, urllib.error, platform, threading, tempfile, traceback import os, sys from xml.etree.ElementTree import XML from . import Device from . import _debugprint __all__ = ['OpenPrinting'] def _normalize_space (text): result = text.strip () result = result.replace ('\n', ' ') i = result.find (' ') while i != -1: result = result.replace (' ', ' ') i = result.find (' ') return result class _QueryThread (threading.Thread): def __init__ (self, parent, parameters, callback, user_data=None): threading.Thread.__init__ (self) self.parent = parent self.parameters = parameters self.callback = callback self.user_data = user_data self.result = b'' self.setDaemon (True) _debugprint ("+%s" % self) def __del__ (self): _debugprint ("-%s" % self) def run (self): # CGI script to be executed query_command = "/query.cgi" # Headers for the post request headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} params = ("%s&uilanguage=%s&locale=%s" % (urllib.parse.urlencode (self.parameters), self.parent.language[0], self.parent.language[0])) self.url = "https://%s%s?%s" % (self.parent.base_url, query_command, params) # Send request result = None self.result = b'' status = 1 try: req = requests.get(self.url, verify=True) self.result = req.content status = 0 except: self.result = sys.exc_info () if status is None: status = 0 _debugprint ("%s: query complete" % self) if self.callback is not None: self.callback (status, self.user_data, self.result) class OpenPrinting: def __init__(self, language=None): """ @param language: language, as given by the first element of locale.setlocale(). @type language: string """ if language is None: import locale try: language = locale.getlocale(locale.LC_MESSAGES) except locale.Error: language = 'C' self.language = language # XXX Read configuration file. self.base_url = "www.openprinting.org" # Restrictions on driver choices XXX Parameters to be taken from # config file self.onlyfree = 1 self.onlymanufacturer = 0 _debugprint ("OpenPrinting: Init %s %s %s" % (self.language, self.onlyfree, self.onlymanufacturer)) _debugprint ("+%s" % self) def __del__ (self): _debugprint ("-%s" % self) def cancelOperation(self, handle): """ Cancel an operation. @param handle: query/operation handle """ # Just prevent the callback. try: handle.callback = None except: pass def webQuery(self, parameters, callback, user_data=None): """ Run a web query for a driver. @type parameters: dict @param parameters: URL parameters @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ the_thread = _QueryThread (self, parameters, callback, user_data) the_thread.start() return the_thread def searchPrinters(self, searchterm, callback, user_data=None): """ Search for printers using a search term. @type searchterm: string @param searchterm: search term @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result) return status = 0 printers = {} try: root = XML (result) # We store the printers as a dict of: # foomatic_id: displayname for printer in root.findall ("printer"): id = printer.find ("id") make = printer.find ("make") model = printer.find ("model") if id is not None and make is not None and model is not None: idtxt = id.text maketxt = make.text modeltxt = model.text if idtxt and maketxt and modeltxt: printers[idtxt] = maketxt + " " + modeltxt except: status = 1 printers = sys.exc_info () _debugprint ("searchPrinters/parse_result: OpenPrinting entries: %s" % repr(printers)) try: callback (status, user_data, printers) except: (type, value, tb) = sys.exc_info () tblast = traceback.extract_tb (tb, limit=None) if len (tblast): tblast = tblast[:len (tblast) - 1] extxt = traceback.format_exception_only (type, value) for line in traceback.format_tb(tb): print (line.strip ()) print (extxt[0].strip ()) # Common parameters for the request params = { 'type': 'printers', 'printer': searchterm, 'format': 'xml' } _debugprint ("searchPrinters: Querying OpenPrinting: %s" % repr(params)) return self.webQuery(params, parse_result, (callback, user_data)) def listDrivers(self, model, callback, user_data=None, extra_options=None): """ Obtain a list of printer drivers. @type model: string or cupshelpers.Device @param model: foomatic printer model string or a cupshelpers.Device object @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @type extra_options: string -> string dictionary @param extra_options: Additional search options, see http://www.linuxfoundation.org/en/OpenPrinting/Database/Query @return: query handle """ def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result) try: # filter out invalid UTF-8 to avoid breaking the XML parser result = result.decode('UTF-8', errors='replace').encode('UTF-8') root = XML (result) drivers = {} # We store the drivers as a dict of: # foomatic_id: # { 'name': name, # 'url': url, # 'supplier': supplier, # 'license': short license string e.g. GPLv2, # 'licensetext': license text (Plain text), # 'nonfreesoftware': Boolean, # 'thirdpartysupplied': Boolean, # 'manufacturersupplied': Boolean, # 'patents': Boolean, # 'supportcontacts' (optional): # list of { 'name', # 'url', # 'level', # } # 'shortdescription': short description, # 'recommended': Boolean, # 'functionality': # { 'text': integer percentage, # 'lineart': integer percentage, # 'graphics': integer percentage, # 'photo': integer percentage, # 'speed': integer percentage, # } # 'packages' (optional): # { arch: # { file: # { 'url': url, # 'fingerprint': signature key fingerprint URL # 'realversion': upstream version string, # 'version': packaged version string, # 'release': package release string # } # } # } # 'ppds' (optional): # URL string list # } # There is more information in the raw XML, but this # can be added to the Python structure as needed. for driver in root.findall ('driver'): id = driver.attrib.get ('id') if id is None: continue dict = {} for attribute in ['name', 'url', 'supplier', 'license', 'shortdescription' ]: element = driver.find (attribute) if element is not None and element.text is not None: dict[attribute] = _normalize_space (element.text) element = driver.find ('licensetext') if element is not None and element.text is not None: dict['licensetext'] = element.text if not 'licensetext' in dict or \ dict['licensetext'] is None: element = driver.find ('licenselink') if element is not None: license_url = element.text if license_url is not None: try: req = requests.get(license_url, verify=True) dict['licensetext'] = \ req.content.decode("utf-8") except: _debugprint('Cannot retrieve %s' % license_url) for boolean in ['nonfreesoftware', 'recommended', 'patents', 'thirdpartysupplied', 'manufacturersupplied']: dict[boolean] = driver.find (boolean) is not None # Make a 'freesoftware' tag for compatibility with # how the OpenPrinting API used to work (see trac # #74). dict['freesoftware'] = not dict['nonfreesoftware'] supportcontacts = [] container = driver.find ('supportcontacts') if container is not None: for sc in container.findall ('supportcontact'): supportcontact = {} if sc.text is not None: supportcontact['name'] = \ _normalize_space (sc.text) else: supportcontact['name'] = "" supportcontact['url'] = sc.attrib.get ('url') supportcontact['level'] = sc.attrib.get ('level') supportcontacts.append (supportcontact) if supportcontacts: dict['supportcontacts'] = supportcontacts if 'name' not in dict or 'url' not in dict: continue container = driver.find ('functionality') if container is not None: functionality = {} for attribute in ['text', 'lineart', 'graphics', 'photo', 'speed']: element = container.find (attribute) if element is not None: functionality[attribute] = element.text if functionality: dict[container.tag] = functionality packages = {} container = driver.find ('packages') if container is not None: for arch in container.getchildren (): rpms = {} for package in arch.findall ('package'): rpm = {} for attribute in ['realversion','version', 'release', 'url', 'pkgsys', 'fingerprint']: element = package.find (attribute) if element is not None: rpm[attribute] = element.text repositories = package.find ('repositories') if repositories is not None: for pkgsys in repositories.getchildren (): rpm.setdefault('repositories', {})[pkgsys.tag] = pkgsys.text rpms[package.attrib['file']] = rpm packages[arch.tag] = rpms if packages: dict['packages'] = packages ppds = [] container = driver.find ('ppds') if container is not None: for each in container.getchildren (): ppds.append (each.text) if ppds: dict['ppds'] = ppds drivers[id] = dict _debugprint ("listDrivers/parse_result: OpenPrinting entries: %s" % repr(drivers)) callback (0, user_data, drivers) except: callback (1, user_data, sys.exc_info ()) if isinstance(model, Device): model = model.id architecture = platform.machine() # On Intel, we could be running a 32bit user space with a 64bit kernel, in # which case platform.machine() will return x86_64, leading to downloading # the wrong printer driver, so we make sure we ask for i386 in that case. if architecture == 'x86_64' and platform.architecture()[0] == '32bit': architecture = 'i386' params = { 'type': 'drivers', 'moreinfo': '1', 'showprinterid': '1', 'onlynewestdriverpackages': '1', 'architectures': architecture, 'noobsoletes': '1', 'onlyfree': str (self.onlyfree), 'onlymanufacturer': str (self.onlymanufacturer), 'printer': model, 'format': 'xml'} if extra_options: params.update(extra_options) _debugprint ("listDrivers: Querying OpenPrinting: %s" % repr(params)) return self.webQuery(params, parse_result, (callback, user_data)) def _simple_gui (): from gi.repository import Gdk from gi.repository import Gtk import pprint Gdk.threads_init () class QueryApp: def __init__(self): self.openprinting = OpenPrinting() self.main = Gtk.Dialog (title="OpenPrinting query application", transient_for=None, modal=True) self.main.add_buttons (Gtk.STOCK_CLOSE, Gtk.ResponseType.CLOSE, "Search", 10, "List", 20) self.main.set_border_width (6) self.main.vbox.set_spacing (2) vbox = Gtk.VBox.new (False, 6) self.main.vbox.pack_start (vbox, True, True, 0) vbox.set_border_width (6) self.entry = Gtk.Entry () vbox.pack_start (self.entry, False, False, 6) sw = Gtk.ScrolledWindow () self.tv = Gtk.TextView () sw.add (self.tv) vbox.pack_start (sw, True, True, 6) self.main.connect ("response", self.response) self.main.show_all () def response (self, dialog, response): if (response == Gtk.ResponseType.CLOSE or response == Gtk.ResponseType.DELETE_EVENT): Gtk.main_quit () if response == 10: # Run a query. self.openprinting.searchPrinters (self.entry.get_text (), self.search_printers_callback) if response == 20: self.openprinting.listDrivers (self.entry.get_text (), self.list_drivers_callback) def search_printers_callback (self, status, user_data, printers): if status != 0: raise printers[1] text = "" for printer in printers.values (): text += printer + "\n" Gdk.threads_enter () self.tv.get_buffer ().set_text (text) Gdk.threads_leave () def list_drivers_callback (self, status, user_data, drivers): if status != 0: raise drivers[1] text = pprint.pformat (drivers) Gdk.threads_enter () self.tv.get_buffer ().set_text (text) Gdk.threads_leave () def query_callback (self, status, user_data, result): Gdk.threads_enter () self.tv.get_buffer ().set_text (str (result)) open ("result.xml", "w").write (str (result)) Gdk.threads_leave () q = QueryApp() Gtk.main () Save