diff --git a/README.md b/README.md index 1f9db47..2041cd5 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ authentication = linkedin.LinkedInAuthentication(API_KEY, API_SECRET, RETURN_URL # It can be used to track your user state or something else (it's up to you) # Be aware that this value is sent to OAuth server AS IS - make sure to encode or hash it #authorization.state = 'your_encoded_message' -print authentication.authorization_url # open this url on your browser +print(authentication.authorization_url) # open this url on your browser application = linkedin.LinkedInApplication(authentication) ``` When you grant access to the application, you will be redirected to the return url with the following query strings appended to your **RETURN_URL**: @@ -93,7 +93,27 @@ This means that the value of the **authorization_code** is **AQTXrv3Pe1iWS0EQvLg ```python authentication.authorization_code = 'AQTXrv3Pe1iWS0EQvLg0NJA8ju_XuiadXACqHennhWih7iRyDSzAm5jaf3R7I8' -authentication.get_access_token() +token = authentication.get_access_token() +``` + +you can retrieve the access token though the namedtuple returned by the get_access_token method + +```python +print(token.access_token) +print(token.expires_in) +``` + +optionally, LinkedIn supports programmatic refresh tokens for all approved Marketing Developer Platform (MDP) partners, so +you can obtain a new access token passing a refresh token to the refresh_access_token method, more info [here](https://docs.microsoft.com/en-us/linkedin/shared/authentication/programmatic-refresh-tokens) + +```python +refresh_token = token.refresh_token # the token previously obtained via Oauth +authentication = linkedin.LinkedInAuthentication(API_KEY, API_SECRET) +new_token = authentication.refresh_access_token(refresh_token) +print(new_token.access_token) +print(new_token.expires_in) +print(new_token.refresh_token) +print(new_token.refresh_token_expires_in) ``` After you get the access token, you are now permitted to make API calls on behalf of the user who granted access to you app. In addition to that, in order to prevent from going through the OAuth flow for every consecutive request, diff --git a/linkedin_v2/linkedin.py b/linkedin_v2/linkedin.py index 40e1590..90d309c 100644 --- a/linkedin_v2/linkedin.py +++ b/linkedin_v2/linkedin.py @@ -12,8 +12,10 @@ import requests from requests_oauthlib import OAuth1 -from .models import AccessToken +from .models import * from .utils import enum, to_utf8, raise_for_error, StringIO +import json +import urllib __all__ = ['LinkedInAuthentication', 'LinkedInApplication', 'PERMISSIONS'] @@ -37,9 +39,10 @@ GROUPS='https://api.linkedin.com/v2/groups', POSTS='https://api.linkedin.com/v2/posts', COMPANIES='https://api.linkedin.com/v2/companies', - COMPANY_SEARCH='https://api.linkedin.com/v2/company-search', + COMPANY_SEARCH='https://api.linkedin.com/v2/search?q=companiesV2', JOBS='https://api.linkedin.com/v2/jobs', - JOB_SEARCH='https://api.linkedin.com/v2/job-search') + JOB_SEARCH='https://api.linkedin.com/v2/job-search', + SHARE='https://api.linkedin.com/v2/shares') NETWORK_UPDATES = enum('NetworkUpdate', APPLICATION='APPS', @@ -72,6 +75,9 @@ def __init__(self, consumer_key, consumer_secret, user_token, user_secret, self.permissions = permissions +REDIRECT_URI_ERROR_MESSAGE = 'You must init LinkedInAuthentication with a redirect_url' +AUTHORIZATION_CODE_ERROR_MESSAGE = 'You must first get the authorization code' + class LinkedInAuthentication(object): """ Implements a standard OAuth 2.0 flow that involves redirection for users to @@ -80,7 +86,7 @@ class LinkedInAuthentication(object): AUTHORIZATION_URL = 'https://www.linkedin.com/uas/oauth2/authorization' ACCESS_TOKEN_URL = 'https://www.linkedin.com/uas/oauth2/accessToken' - def __init__(self, key, secret, redirect_uri, permissions=None): + def __init__(self, key, secret, redirect_uri=None, permissions=None): self.key = key self.secret = secret self.redirect_uri = redirect_uri @@ -92,6 +98,7 @@ def __init__(self, key, secret, redirect_uri, permissions=None): @property def authorization_url(self): + assert self.redirect_uri, REDIRECT_URI_ERROR_MESSAGE qd = {'response_type': 'code', 'client_id': self.key, 'scope': (' '.join(self.permissions)).strip(), @@ -108,11 +115,28 @@ def last_error(self): def _make_new_state(self): return hashlib.md5( - '{}{}'.format(random.randrange(0, 2 ** 63), self.secret).encode("utf8") + '{}{}'.format(random.randrange(0, 2 ** 63), + self.secret).encode("utf8") ).hexdigest() - def get_access_token(self, timeout=60): - assert self.authorization_code, 'You must first get the authorization code' + @staticmethod + def _get_token_from_response(response): + response = response.json() + access_token = response.get(ACCESS_TOKEN_KEY) + expires_in = response.get(EXPIRES_IN_KEY) + refresh_token = response.get(REFRESH_TOKEN_KEY) + refresh_token_expires_in = response.get(REFRESH_TOKEN_EXPIRES_IN_KEY) + + return AccessToken( + access_token, + expires_in, + refresh_token or None, + refresh_token_expires_in or None + ) + + def get_access_token(self, timeout=TIMEOUT): + assert self.authorization_code, AUTHORIZATION_CODE_ERROR_MESSAGE + assert self.redirect_uri, REDIRECT_URI_ERROR_MESSAGE qd = {'grant_type': 'authorization_code', 'code': self.authorization_code, 'redirect_uri': self.redirect_uri, @@ -120,8 +144,27 @@ def get_access_token(self, timeout=60): 'client_secret': self.secret} response = requests.post(self.ACCESS_TOKEN_URL, data=qd, timeout=timeout) raise_for_error(response) - response = response.json() - self.token = AccessToken(response['access_token'], response['expires_in']) + + self.token = self._get_token_from_response(response) + return self.token + + def refresh_access_token(self, refresh_token, timeout=TIMEOUT): + """ + Exchanges a Refresh Token for a New Access Token + :param refresh_token: str + :param timeout: int + :return: AccessToken + """ + qd = { + 'grant_type': 'refresh_token', + REFRESH_TOKEN_KEY: refresh_token, + 'client_id': self.key, + 'client_secret': self.secret + } + response = requests.post(self.ACCESS_TOKEN_URL, data=qd, timeout=timeout) + raise_for_error(response) + + self.token = self._get_token_from_response(response) return self.token @@ -147,14 +190,16 @@ def __init__(self, authentication=None, token=None): self.authentication = authentication if not self.authentication: self.authentication = LinkedInAuthentication('', '', '') - self.authentication.token = AccessToken(token, None) + self.authentication.token = AccessToken(token, None, None, None) def make_request(self, method, url, data=None, params=None, headers=None, - timeout=60): + timeout=TIMEOUT): if headers is None: - headers = {'x-li-format': 'json', 'Content-Type': 'application/json'} + headers = {'x-li-format': 'json', + 'Content-Type': 'application/json'} else: - headers.update({'x-li-format': 'json', 'Content-Type': 'application/json'}) + headers.update( + {'x-li-format': 'json', 'Content-Type': 'application/json'}) if params is None: params = {} @@ -167,17 +212,25 @@ def make_request(self, method, url, data=None, params=None, headers=None, self.authentication.user_token, self.authentication.user_secret) kw.update({'auth': auth}) else: - params.update({'oauth2_access_token': self.authentication.token.access_token}) + params.update( + {'oauth2_access_token': self.authentication.token.access_token}) return requests.request(method.upper(), url, **kw) + def make_get_request(self, url): + print(url) + response = self.make_request('GET', url) + # raise_for_error(response) + return response.json() + def get_connections(self, totals_only=None, params=None, headers=None): count = '50' if totals_only: count = '0' url = '%s?q=viewer&start=0&count=%s' % (ENDPOINTS.CONNECTIONS, count) - response = self.make_request('GET', url, params=params, headers=headers) - raise_for_error(response) + response = self.make_request( + 'GET', url, params=params, headers=headers) + # raise_for_error(response) return response.json() def get_profile(self, member_id=None, member_url=None, selectors=None, @@ -187,10 +240,296 @@ def get_profile(self, member_id=None, member_url=None, selectors=None, connections_response = self.get_connections(totals_only=True) connections_body = connections_response.get('paging', None) connections = connections_body.get('total', 0) - + url = '%s/me' % ENDPOINTS.BASE - response = self.make_request('GET', url, params=params, headers=headers) + response = self.make_request( + 'GET', url, params=params, headers=headers) raise_for_error(response) json_response = response.json() json_response.update({'numConnections': connections}) return json_response + + def get_shares(self, totals_only=None, params=None, headers=None): + count = '100' + if totals_only: + count = '0' + url = '%s?q=owners&owners=urn:li:person:%s&sharesPerOwner=100' % ( + ENDPOINTS.SHARE, self.get_profile()['id']) + response = self.make_request( + 'GET', url, params=params, headers=headers) + raise_for_error(response) + return response.json() + + def search_profile(self, params): + url = '%s/clientAwareMemberHandles?q=handleString&%s&projection=(elements*(member~))' % ( + ENDPOINTS.BASE, urllib.parse.urlencode(params)) + print(url) + response = self.make_request('GET', url) + raise_for_error(response) + return response.json() + + def post_share(self, post_type='person', company_id=None, title=None, description=None, + submitted_url=None, submitted_image_url=None): + + if post_type == 'organization': + post_owner = "urn:li:organization:%s" % company_id + else: + post_owner = "urn:li:person:%s" % self.get_profile()['id'] + + post = { + "owner": post_owner, + "text": { + "text": "" + }, + "subject": "", + "distribution": { + "linkedInDistributionTarget": {} + }, + "content": { + "contentEntities": [], + "title": "" + } + } + + content_entity = { + "entityLocation": "", + "thumbnails": [] + } + + if title is not None: + post['content']['title'] = title + post['subject'] = title + + if description is not None: + post['text']['text'] = description + + if submitted_url is not None: + content_entity['entityLocation'] = submitted_url + + if submitted_image_url is not None: + content_entity['thumbnails'] = [{"imageSpecificContent": {}, "resolvedUrl": submitted_image_url}] + + post['content']['contentEntities'] = [content_entity] + + response = self.make_request( + 'POST', ENDPOINTS.SHARE, data=json.dumps(post)) + return response.json() + + def search_company(self, params): + url = '%s/search?q=companiesV2&%s' % ( + ENDPOINTS.BASE, urllib.parse.urlencode(params)) + response = self.make_request('GET', url) + # raise_for_error(response) + return response.json() + + def get_organization(self, organization_id, params=None, headers=None): + url = '%s/organizations/%s' % (ENDPOINTS.BASE, organization_id) + response = self.make_request( + 'GET', url, params=params, headers=headers) + raise_for_error(response) + return response.json() + + def get_brand(self, brand_id, params=None, headers=None): + url = '%s/organizationBrands/%s' % (ENDPOINTS.BASE, brand_id) + response = self.make_request( + 'GET', url, params=params, headers=headers) + raise_for_error(response) + return response.json() + + def send_invitation(self, invitee_email): + post = { + "invitee": "urn:li:email:%s" % invitee_email, + "message": { + "com.linkedin.invitations.InvitationMessage": { + "body": "Let's connect!" + } + } + } + response = self.make_request( + 'POST', '%s/invitations' % ENDPOINTS.BASE, data=json.dumps(post)) + raise_for_error(response) + return response.json() + + def search_job(self): + url = '%s/recommendedJobs?q=byMember' % ENDPOINTS.BASE + return self.make_get_request(url) + + def get_job(self, **kwargs): + return self.search_job() + + def get_post_comments(self, selectors, params=None, **kwargs): + url = '%s/socialActions/urn:li:share:%s/comments' % ( + ENDPOINTS.BASE, kwargs['post_id']) + print(url) + response = self.make_request( + 'GET', url, params=params) + raise_for_error(response) + return response.json() + + def get_group(self, group_id, params=None, headers=None): + url = '%s/groupDefinitions/%s' % (ENDPOINTS.BASE, group_id) + response = self.make_request( + 'GET', url, params=params, headers=headers) + raise_for_error(response) + return response.json() + + def get_group_by_ids(self, group_ids, params=None, headers=None): + url = '%s/groupDefinitions/?ids=List(%s)' % (ENDPOINTS.BASE, group_ids) + response = self.make_request( + 'GET', url, params=params, headers=headers) + raise_for_error(response) + return response.json() + + def get_memberships(self, group_id): + post = { + "action": "SEND_REQUEST", + "group": "urn:li:group:%s" % group_id, + "members": [ + "urn:li:person:%s" % self.get_profile['id'], + ] + } + response = self.make_request( + 'POST', '%s/groupMemberships?action=membershipAction' % ENDPOINTS.BASE, data=json.dumps(post)) + raise_for_error(response) + return response.json() + + def submit_group_post(self, group_id, title, description, + shareCommentary): + post = { + "author": "urn:li:person:%s" % self.get_profile()['id'], + "containerEntity": "urn:li:group:%s" % group_id, + "lifecycleState": "PUBLISHED", + "specificContent": { + "com.linkedin.ugc.ShareContent": { + "media": [ + { + "title": { + "attributes": [], + "text": title + }, + "description": { + "attributes": [], + "text": description + }, + "thumbnails": [], + "status": "READY" + } + ], + "shareCommentary": { + "attributes": [], + "text": "" + } + } + }, + "visibility": { + "com.linkedin.ugc.MemberNetworkVisibility": "CONTAINER" + } + } + if shareCommentary is not None: + post['specificContent']['shareCommentary'] = shareCommentary + response = self.make_request( + 'POST', ENDPOINTS.SHARE, data=json.dumps(post)) + raise_for_error(response) + return response.json() + + def get_company_updates(self, organization_id, post): + url = '%s/people/id=%s/organizations/%s' % ( + ENDPOINTS.SHARE, self.get_profile['id'], organization_id) + response = self.make_request( + 'POST', url, data=json.dumps(post)) + raise_for_error(response) + return response.json() + + def get_group(self): + print(ENDPOINTS.BASE) + url = "%s/groupMemberships?q=member&member=urn:li:person:%s&membershipStatuses=List(MEMBER,OWNER)" % ( + ENDPOINTS.BASE, self.get_profile()['id']) + response = self.make_request('GET', url) + # raise_for_error(response) + return response.json() + + def submit_company_share(self, **kwargs): + submitted_url, submitted_image_url, visibility_code = None, None, None + if kwargs["submitted_url"]: + submitted_url = kwargs["submitted_url"] + if kwargs["submitted_image_url"]: + submitted_image_url = kwargs["submitted_image_url"] + if kwargs["visibility_code"]: + visibility_code = kwargs["visibility_code"] + response = self.post_share(post_type='organization', company_id=kwargs["company_id"], comment=None, + title=kwargs["title"], description=kwargs["description"], + submitted_url=submitted_url, submitted_image_url=submitted_image_url, + visibility_code=visibility_code) + return response + + def find_member_organization_access_info(self, **kwargs): + # https://api.linkedin.com/v2/organizationalEntityAcls?q=roleAssignee + url = "%s/organizationalEntityAcls?q=roleAssignee&role=ADMINISTRATOR&projection=(elements*(*,roleAssignee~(localizedFirstName, localizedLastName), organizationalTarget~(localizedName)))" % ENDPOINTS.BASE + return self.make_get_request(url) + + def find_organization_access_control_info(self, organization_id): + # https://api.linkedin.com/v2/organizationalEntityAcls?q=organizationalTarget&organizationalTarget={URN} + url = "%s/organizationalEntityAcls?q=organizationalTarget&organizationalTarget=urn:li:organization:%s" % ( + ENDPOINTS.BASE, organization_id) + return self.make_get_request(url) + + def retrieve_lifetime_follower_statistics(self, organization_id): + # https://api.linkedin.com/v2/organizationalEntityFollowerStatistics?q=organizationalEntity&organizationalEntity={organization URN} + url = "%s/organizationalEntityFollowerStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:%s" % ( + ENDPOINTS.BASE, organization_id) + print(url) + return self.make_get_request(url) + + def retrieve_time_bound_follower_statistics(self, organization_id, range_start, range_end): + # https://api.linkedin.com/v2/organizationalEntityFollowerStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:2414183&timeIntervals.timeGranularityType=DAY&timeIntervals.timeRange.start=1451606400000&timeIntervals.timeRange.end=1452211200000 + url = "%s/organizationalEntityFollowerStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:%s&timeIntervals.timeGranularityType=DAY&timeIntervals.timeRange.start=%s&timeIntervals.timeRange.end=%s" % ( + ENDPOINTS.BASE, organization_id, range_start, range_end) + return self.make_get_request(url) + + def retrieve_organization_page_statistics(self, organization_id): + # https://api.linkedin.com/v2/organizationPageStatistics?q=organization&organization={organization URN} + url = "%s/organizationPageStatistics?q=organization&organization=urn:li:organization:%s" % ( + ENDPOINTS.BASE, organization_id) + return self.make_get_request(url) + + def retrieve_share_statistics(self, organization_id): + # https://api.linkedin.com/v2/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity={organization URN} + url = "%s/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:%s" % ( + ENDPOINTS.BASE, organization_id) + return self.make_get_request(url) + + def retrieve_organization_brand_page_statistics(self, brand_id): + # https://api.linkedin.com/v2/brandPageStatistics?q=brand&brand=urn:li:organizationBrand:3617422 + url = "%s/brandPageStatistics?q=brand&brand=urn:li:organizationBrand:%s" % ( + ENDPOINTS.BASE, brand_id) + return self.make_get_request(url) + + def delete_share(self, share_id): + # https://api.linkedin.com/v2/shares/{share ID} + url = "%s/shares/%s" % (ENDPOINTS.BASE, share_id) + response = self.make_request('DELETE', url) + # raise_for_error(response) + return response.json() + + def retrieve_likes_on_shares(self, share_id): + # https://api.linkedin.com/v2/socialActions/{shareUrn|ugcPostUrn|commentUrn|groupPostUrn}/likes + url = "%s/socialActions/urn:li:share:%s/likes" % ( + ENDPOINTS.BASE, share_id) + return self.make_get_request(url) + + def retrieve_comments_on_shares(self, share_id): + # https://api.linkedin.com/v2/socialActions/{shareUrn|ugcPostUrn|commentUrn|groupPostUrn}/comments + url = "%s/socialActions/urn:li:share:%s/comments" % ( + ENDPOINTS.BASE, share_id) + return self.make_get_request(url) + + def retrieve_statistics_specific_shares(self, organization_id, share_ids): + # https://api.linkedin.com/v2/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:2414183&shares[0]=urn:li:share:1000000&shares[1]=urn:li:share:1000001 + shaer_str = '' + count = 0 + for item in share_ids: + shaer_str = '&shares['+str(count)+']=urn:li:share:'+item + count = count + 1 + url = "%s/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:%s%s" % ( + ENDPOINTS.BASE, organization_id, shaer_str) + return self.make_get_request(url) diff --git a/linkedin_v2/models.py b/linkedin_v2/models.py index 613a105..266196c 100644 --- a/linkedin_v2/models.py +++ b/linkedin_v2/models.py @@ -1,94 +1,107 @@ -# -*- coding: utf-8 -*- -import collections - -AccessToken = collections.namedtuple('AccessToken', ['access_token', 'expires_in']) - - -class LinkedInRecipient(object): - def __init__(self, member_id, email, first_name, last_name): - assert member_id or email, 'Either member ID or email must be given' - if member_id: - self.member_id = str(member_id) - else: - self.member_id = None - self.email = email - self.first_name = first_name - self.last_name = last_name - - @property - def json(self): - result = {'person': None} - if self.member_id: - result['person'] = {'_path': '/people/id=%s' % self.member_id} - else: - result['person'] = {'_path': '/people/email=%s' % self.email} - - if self.first_name: - result['person']['first-name'] = self.first_name - - if self.last_name: - result['person']['last-name'] = self.last_name - - return result - - -class LinkedInInvitation(object): - def __init__(self, subject, body, recipients, connect_type, auth_name=None, - auth_value=None): - self.subject = subject - self.body = body - self.recipients = recipients - self.connect_type = connect_type - self.auth_name = auth_name - self.auth_value = auth_value - - @property - def json(self): - result = { - 'recipients': { - 'values': [] - }, - 'subject': self.subject, - 'body': self.body, - 'item-content': { - 'invitation-request': { - 'connect-type': self.connect_type - } - } - } - for recipient in self.recipients: - result['recipients']['values'].append(recipient.json) - - if self.auth_name and self.auth_value: - auth = {'name': self.auth_name, 'value': self.auth_value} - result['item-content']['invitation-request']['authorization'] = auth - - return result - - -class LinkedInMessage(object): - def __init__(self, subject, body, recipients, auth_name=None, - auth_value=None): - self.subject = subject - self.body = body - self.recipients = recipients - self.auth_name = auth_name - self.auth_value = auth_value - - @property - def json(self): - result = { - 'recipients': { - 'values': [] - }, - 'subject': self.subject, - 'body': self.body, - } - for recipient in self.recipients: - result['recipients']['values'].append(recipient.json) - - if self.auth_name and self.auth_value: - auth = {'name': self.auth_name, 'value': self.auth_value} - result['item-content']['invitation-request']['authorization'] = auth - - return result +# -*- coding: utf-8 -*- +import collections + +TIMEOUT = 60 # secs +ACCESS_TOKEN_KEY = 'access_token' +EXPIRES_IN_KEY = 'expires_in' +REFRESH_TOKEN_KEY = 'refresh_token' +REFRESH_TOKEN_EXPIRES_IN_KEY = 'refresh_token_expires_in' + +AccessToken = collections.namedtuple('AccessToken', + [ + ACCESS_TOKEN_KEY, + EXPIRES_IN_KEY, + REFRESH_TOKEN_KEY, + REFRESH_TOKEN_EXPIRES_IN_KEY + ] +) + + +class LinkedInRecipient(object): + def __init__(self, member_id, email, first_name, last_name): + assert member_id or email, 'Either member ID or email must be given' + if member_id: + self.member_id = str(member_id) + else: + self.member_id = None + self.email = email + self.first_name = first_name + self.last_name = last_name + + @property + def json(self): + result = {'person': None} + if self.member_id: + result['person'] = {'_path': '/people/id=%s' % self.member_id} + else: + result['person'] = {'_path': '/people/email=%s' % self.email} + + if self.first_name: + result['person']['first-name'] = self.first_name + + if self.last_name: + result['person']['last-name'] = self.last_name + + return result + + +class LinkedInInvitation(object): + def __init__(self, subject, body, recipients, connect_type, auth_name=None, + auth_value=None): + self.subject = subject + self.body = body + self.recipients = recipients + self.connect_type = connect_type + self.auth_name = auth_name + self.auth_value = auth_value + + @property + def json(self): + result = { + 'recipients': { + 'values': [] + }, + 'subject': self.subject, + 'body': self.body, + 'item-content': { + 'invitation-request': { + 'connect-type': self.connect_type + } + } + } + for recipient in self.recipients: + result['recipients']['values'].append(recipient.json) + + if self.auth_name and self.auth_value: + auth = {'name': self.auth_name, 'value': self.auth_value} + result['item-content']['invitation-request']['authorization'] = auth + + return result + + +class LinkedInMessage(object): + def __init__(self, subject, body, recipients, auth_name=None, + auth_value=None): + self.subject = subject + self.body = body + self.recipients = recipients + self.auth_name = auth_name + self.auth_value = auth_value + + @property + def json(self): + result = { + 'recipients': { + 'values': [] + }, + 'subject': self.subject, + 'body': self.body, + } + for recipient in self.recipients: + result['recipients']['values'].append(recipient.json) + + if self.auth_name and self.auth_value: + auth = {'name': self.auth_name, 'value': self.auth_value} + result['item-content']['invitation-request']['authorization'] = auth + + return result diff --git a/linkedin_v2/server.py b/linkedin_v2/server.py index 4032a2a..41e507f 100644 --- a/linkedin_v2/server.py +++ b/linkedin_v2/server.py @@ -1,40 +1,39 @@ -# -*- coding: utf-8 -*- -import BaseHTTPServer -import urlparse - -from .linkedin import LinkedInApplication, LinkedInAuthentication, PERMISSIONS - - -def quick_api(api_key, secret_key, port=8000): - """ - This method helps you get access to linkedin api quickly when using it - from the interpreter. - Notice that this method creates http server and wait for a request, so it - shouldn't be used in real production code - it's just an helper for debugging - - The usage is basically: - api = quick_api(KEY, SECRET) - After you do that, it will print a URL to the screen which you must go in - and allow the access, after you do that, the method will return with the api - object. - """ - auth = LinkedInAuthentication(api_key, secret_key, 'http://localhost:8000/', - PERMISSIONS.enums.values()) - app = LinkedInApplication(authentication=auth) - print auth.authorization_url - _wait_for_user_to_enter_browser(app, port) - return app - - -def _wait_for_user_to_enter_browser(app, port): - class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - p = self.path.split('?') - if len(p) > 1: - params = urlparse.parse_qs(p[1], True, True) - app.authentication.authorization_code = params['code'][0] - app.authentication.get_access_token() - - server_address = ('', port) - httpd = BaseHTTPServer.HTTPServer(server_address, MyHandler) - httpd.handle_request() +# -*- coding: utf-8 -*- +import BaseHTTPServer +import urlparse + +from .linkedin import LinkedInApplication, LinkedInAuthentication, PERMISSIONS + + +def quick_api(api_key, secret_key, port=8000): + """ + This method helps you get access to linkedin api quickly when using it + from the interpreter. + Notice that this method creates http server and wait for a request, so it + shouldn't be used in real production code - it's just an helper for debugging + + The usage is basically: + api = quick_api(KEY, SECRET) + After you do that, it will print a URL to the screen which you must go in + and allow the access, after you do that, the method will return with the api + object. + """ + auth = LinkedInAuthentication(api_key, secret_key, 'http://localhost:8000/', + PERMISSIONS.enums.values()) + app = LinkedInApplication(authentication=auth) + _wait_for_user_to_enter_browser(app, port) + return app + + +def _wait_for_user_to_enter_browser(app, port): + class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + p = self.path.split('?') + if len(p) > 1: + params = urlparse.parse_qs(p[1], True, True) + app.authentication.authorization_code = params['code'][0] + app.authentication.get_access_token() + + server_address = ('', port) + httpd = BaseHTTPServer.HTTPServer(server_address, MyHandler) + httpd.handle_request() diff --git a/linkedin_v2/utils.py b/linkedin_v2/utils.py index 8b10aeb..2e0d8d1 100644 --- a/linkedin_v2/utils.py +++ b/linkedin_v2/utils.py @@ -70,8 +70,8 @@ def raise_for_error(response): raise ex(message) else: raise LinkedInError(error.message) - except (ValueError, TypeError): - raise LinkedInError(error.message) + except (ValueError, TypeError, AttributeError): + raise LinkedInError(str(error)) HTTP_METHODS = enum('HTTPMethod', GET='GET', POST='POST',