"""
MIT License
Copyright (c) 2019-2021 Terbau
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import datetime
import asyncio
import logging
import time
from aioxmpp import JID
from aiohttp import BaseConnector
from typing import (Iterable, Union, Optional, Any, Awaitable, Callable, Dict,
List, Tuple)
from .errors import (PartyError, HTTPException, NotFound, Forbidden,
DuplicateFriendship, FriendshipRequestAlreadySent,
MaxFriendshipsExceeded, InviteeMaxFriendshipsExceeded,
InviteeMaxFriendshipRequestsExceeded, PartyIsFull)
from .xmpp import XMPPClient
from .http import HTTPClient
from .user import (ClientUser, User, BlockedUser, SacSearchEntryUser,
UserSearchEntry)
from .friend import Friend, IncomingPendingFriend, OutgoingPendingFriend
from .enums import (Platform, Region, UserSearchPlatform, AwayStatus,
SeasonStartTimestamp, SeasonEndTimestamp,
BattlePassStat, StatsCollectionType)
from .party import (DefaultPartyConfig, DefaultPartyMemberConfig, ClientParty,
Party)
from .stats import StatsV2, StatsCollection, _StatsBase
from .store import Store
from .news import BattleRoyaleNewsPost
from .playlist import Playlist
from .presence import Presence
from .auth import Auth, RefreshTokenAuth
from .avatar import Avatar
from .typedefs import MaybeCoro, DatetimeOrTimestamp, StrOrInt
from .utils import LockEvent, MaybeLock, from_iso, is_display_name
log = logging.getLogger(__name__)
[github]class StartContext:
[github] def __init__(self, client: 'BasicClient',
dispatch_ready: bool = True) -> None:
self.client = client
self.dispatch_ready = dispatch_ready
[github] async def start(self) -> asyncio.Task:
await self.client.init()
task = asyncio.create_task(
self.client._start(dispatch_ready=self.dispatch_ready)
)
await asyncio.wait(
(asyncio.create_task(self.client.wait_until_ready()), task),
return_when=asyncio.FIRST_COMPLETED,
)
return task
[github] async def __aenter__(self) -> asyncio.Task:
return await self.start()
[github] async def __aexit__(self, *args) -> None:
if not self.client._closing and not self.client._closed:
await self.client.close()
[github] def __await__(self) -> None:
async def awaiter():
task = await self.start()
return await task
return awaiter().__await__()
[github]async def _start_client(client: 'BasicClient', *,
shutdown_on_error: bool = True,
after: Optional[MaybeCoro] = None,
error_after: Optional[MaybeCoro] = None,
) -> None:
loop = asyncio.get_running_loop()
if not isinstance(client, BasicClient):
raise TypeError(
'client must be an instance derived from fortnitepy.BasicClient'
)
async def starter():
try:
await client.start()
except Exception as e:
return e
tasks = (
loop.create_task(starter()),
loop.create_task(client.wait_until_ready())
)
try:
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
except asyncio.CancelledError:
for task in tasks:
task.cancel()
else:
done_task = done.pop()
e = done_task.result()
if e is not None:
await client.close()
identifier = client.auth.identifier
if shutdown_on_error:
if e.args:
e.args = ('{0} - {1}'.format(identifier, e.args[0]),)
else:
e.args = (identifier,)
raise e
else:
if error_after is not None:
if asyncio.iscoroutinefunction(after):
asyncio.ensure_future(error_after(client, e))
else:
error_after(client, e)
return
message = ('An exception occured while running client '
'{0}'.format(identifier))
return loop.call_exception_handler({
'message': message,
'exception': e,
'task': done_task
})
if after:
if asyncio.iscoroutinefunction(after):
asyncio.ensure_future(after(client))
else:
after(client)
await pending.pop()
[github]def _before_event(callback):
event = asyncio.Event()
is_processing = False
async def processor():
nonlocal is_processing
if not is_processing:
is_processing = True
try:
await callback()
finally:
event.set()
else:
await event.wait()
return processor
[github][docs] async def start_multiple(clients: List['BasicClient'], *,
gap_timeout: float = 0.2,
shutdown_on_error: bool = True,
ready_callback: Optional[MaybeCoro] = None,
error_callback: Optional[MaybeCoro] = None,
all_ready_callback: Optional[MaybeCoro] = None,
before_start: Optional[Awaitable] = None,
before_close: Optional[Awaitable] = None
) -> None:
"""|coro|
Starts multiple clients at the same time.
.. warning::
This function is blocking and should be the last function to run.
.. info::
Due to throttling by epicgames on login, the clients are started
with a 0.2 second gap. You can change this value with the gap_timeout
keyword argument.
Parameters
----------
clients: List[:class:`BasicClient`]
A list of the clients you wish to start.
gap_timeout: :class:`float`
The time to sleep between starting clients. Defaults to ``0.2``.
shutdown_on_error: :class:`bool`
If the function should cancel all other start tasks if one of the
tasks fails. You can catch the error by try excepting.
ready_callback: Optional[Union[Callable[:class:`BasicClient`], Awaitable[:class:`BasicClient`]]]
A callable/async callback taking a single parameter ``client``.
The callback is called whenever a client is ready.
error_callback: Optional[Union[Callable[:class:`BasicClient`, Exception], Awaitable[:class:`BasicClient`, Exception]]]
A callable/async callback taking two parameters, :class:`BasicClient`
and an exception. The callback is called whenever a client fails
logging in. The callback is not called if ``shutdown_on_error`` is
``True``.
all_ready_callback: Optional[Union[Callable, Awaitable]]
A callback/async callback that is called whenever all clients
have finished logging in, regardless if one of the clients failed
logging in. That means that the callback is always called when all
clients are either logged in or raised an error.
before_start: Optional[Awaitable]
An async callback that is called when just before the clients are
beginning to start. This must be a coroutine as all the clients
wait to start until this callback is finished processing so you
can do heavy start stuff like opening database connections, sessions
etc.
before_close: Optional[Awaitable]
An async callback that is called when the clients are beginning to
close. This must be a coroutine as all the clients wait to close until
this callback is finished processing so you can do heavy close stuff
like closing database connections, sessions etc.
Raises
------
AuthException
Raised if invalid credentials in any form was passed or some
other misc failure.
ValueError
Two or more clients with the same authentication identifier was
passed. This means that you attemted to start two or more clients
with the same credentials.
HTTPException
A request error occured while logging in.
""" # noqa
loop = asyncio.get_running_loop()
async def waiter(client):
_, pending = await asyncio.wait(
(loop.create_task(client.wait_until_ready()),
loop.create_task(client.wait_until_closed())),
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
async def all_ready_callback_runner():
tasks = [loop.create_task(waiter(client))
for client in clients]
await asyncio.gather(*tasks)
if all(client.is_closed() for client in clients):
return
log.info('All clients started.')
if all_ready_callback:
if asyncio.iscoroutinefunction(all_ready_callback):
asyncio.ensure_future(all_ready_callback())
else:
all_ready_callback()
# Do a check to see if any duplicate clients have been passed
identifiers = []
for client in clients:
identifier = client.auth.identifier
if identifier in identifiers:
raise ValueError(
'Two or more clients with the same auth identifier was passed.'
' Identifier = {}'.format(repr(identifier))
)
identifiers.append(identifier)
await asyncio.gather(*[client.init() for client in clients])
asyncio.ensure_future(all_ready_callback_runner())
_before_start = _before_event(before_start)
_before_close = _before_event(before_close)
tasks = {}
for i, client in enumerate(clients, 1):
tasks[client] = loop.create_task(_start_client(
client,
shutdown_on_error=shutdown_on_error,
after=ready_callback,
error_after=error_callback
))
if before_start is not None:
client.add_event_handler('before_start', _before_start)
if before_close is not None:
client.add_event_handler('before_close', _before_close)
# sleeping between starting to avoid throttling
if i < len(clients):
await asyncio.sleep(gap_timeout)
log.debug('Starting all clients')
return_when = (asyncio.FIRST_EXCEPTION
if shutdown_on_error
else asyncio.ALL_COMPLETED)
done, pending = await asyncio.wait(
list(tasks.values()),
return_when=return_when
)
done_task = done.pop()
if pending and done_task.exception() is not None:
raise done_task.exception()
[github][docs] async def close_multiple(clients: Iterable['BasicClient']) -> None:
"""|coro|
Closes multiple clients at the same time by calling
:meth:`BasicClient.close()` on all of them.
Parameters
----------
clients: Iterable[:class:`BasicClient`]
An iterable of the clients you wish to close. If a client is already
closing or closed, it will get skipped without raising an error.
"""
loop = asyncio.get_running_loop()
tasks = [
loop.create_task(client.close()) for client in clients
if not client._closing and not client.is_closed()
]
await asyncio.gather(*tasks)
[github][docs] def run_multiple(clients: List['BasicClient'], *,
gap_timeout: float = 0.2,
shutdown_on_error: bool = True,
ready_callback: Optional[MaybeCoro] = None,
error_callback: Optional[MaybeCoro] = None,
all_ready_callback: Optional[MaybeCoro] = None,
before_start: Optional[Awaitable] = None,
before_close: Optional[Awaitable] = None
) -> None:
"""This function sets up a loop and then calls :func:`start_multiple()`
for you. If you already have a running event loop, you should start
the clients with :func:`start_multiple()`. On shutdown, all clients
will be closed gracefully.
.. warning::
This function is blocking and should be the last function to run.
.. info::
Due to throttling by epicgames on login, the clients are started
with a 0.2 second gap. You can change this value with the gap_timeout
keyword argument.
Parameters
----------
clients: List[:class:`BasicClient`]
A list of the clients you wish to start.
gap_timeout: :class:`float`
The time to sleep between starting clients. Defaults to ``0.2``.
shutdown_on_error: :class:`bool`
If the function should cancel all other start tasks if one of the
tasks fails. You can catch the error by try excepting.
ready_callback: Optional[Union[Callable[:class:`BasicClient`], Awaitable[:class:`BasicClient`]]]
A callable/async callback taking a single parameter ``client``.
The callback is called whenever a client is ready.
error_callback: Optional[Union[Callable[:class:`BasicClient`, Exception], Awaitable[:class:`BasicClient`, Exception]]]
A callable/async callback taking two parameters, :class:`BasicClient`
and an exception. The callback is called whenever a client fails
logging in. The callback is not called if ``shutdown_on_error`` is
``True``.
all_ready_callback: Optional[Union[Callable, Awaitable]]
A callback/async callback that is called whenever all clients
have finished logging in, regardless if one of the clients failed
logging in. That means that the callback is always called when all
clients are either logged in or raised an error.
before_start: Optional[Awaitable]
An async callback that is called when just before the clients are
beginning to start. This must be a coroutine as all the clients
wait to start until this callback is finished processing so you
can do heavy start stuff like opening database connections, sessions
etc.
before_close: Optional[Awaitable]
An async callback that is called when the clients are beginning to
close. This must be a coroutine as all the clients wait to close until
this callback is finished processing so you can do heavy close stuff
like closing database connections, sessions etc.
Raises
------
AuthException
Raised if invalid credentials in any form was passed or some
other misc failure.
HTTPException
A request error occured while logging in.
""" # noqa
async def runner():
try:
await start_multiple(
clients,
gap_timeout=gap_timeout,
shutdown_on_error=shutdown_on_error,
ready_callback=ready_callback,
error_callback=error_callback,
all_ready_callback=all_ready_callback,
before_start=before_start,
before_close=before_close,
)
finally:
await close_multiple(clients)
try:
asyncio.run(runner())
except KeyboardInterrupt:
pass
[github][docs] class BasicClient:
"""Represents a basic stripped down version of :class:`Client`. You
might want to use this client if your only goal is to make simple
requests like user or stats fetching.
This client does **not** support the following:
- Parties (except :meth:`BasicClient.fetch_party()`)
- Friends
- Anything related to XMPP (Messaging and most events)
Supported events by this client:
- :meth:`event_ready()`
- :meth:`event_before_start()`
- :meth:`event_before_close()`
- :meth:`event_restart()`
- :meth:`event_device_auth_generate()`
- :meth:`event_auth_refresh()`
Parameters
----------
auth: :class:`Auth`
The authentication method to use. You can read more about available
authentication methods :ref:`here <authentication>`.
http_connector: :class:`aiohttp.BaseConnector`
The connector to use for http connection pooling.
http_retry_config: Optional[:class:`HTTPRetryConfig`]
The config to use for http retries.
build: :class:`str`
The build used by Fortnite.
Defaults to a valid but maybe outdated value.
os: :class:`str`
The os version string to use in the user agent.
Defaults to ``Windows/10.0.17134.1.768.64bit`` which is valid no
matter which platform you have set.
cache_users: :class:`bool`
Whether or not the library should cache :class:`User` objects. Disable
this if you are running a program with lots of users as this could
potentially take a big hit on the memory usage. Defaults to ``True``.
Attributes
----------
user: :class:`ClientUser`
The user the client is logged in as.
""" # noqa
[github] def __init__(self, auth: Auth,
**kwargs: Any) -> None:
self.cache_users = kwargs.get('cache_users', True)
self.build = kwargs.get('build', '++Fortnite+Release-14.10-CL-14288110') # noqa
self.os = kwargs.get('os', 'Windows/10.0.17134.1.768.64bit')
self.kill_other_sessions = True
self.accept_eula = True
self.event_prefix = 'event_'
self.auth = auth
self.http = HTTPClient(
self,
connector=kwargs.get('http_connector'),
retry_config=kwargs.get('http_retry_config')
)
self.http.add_header('Accept-Language', 'en-EN')
self._listeners = {}
self._events = {}
self._users = {}
self._refresh_times = []
self._exception_future = None
self._ready_event = None
self._closed_event = None
self._reauth_lock = None
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._has_async_init = False
self.setup_internal()
[github] async def _async_init(self) -> None:
# We must deal with loop stuff after a loop has been
# created by asyncio.run(). This is called at the start
# of start().
self.loop = asyncio.get_running_loop()
self._exception_future = self.loop.create_future()
self._ready_event = asyncio.Event()
self._closed_event = asyncio.Event()
self._reauth_lock = LockEvent()
self._reauth_lock.failed = False
self.auth.initialize(self)
[github][docs] def register_connectors(self,
http_connector: Optional[BaseConnector] = None
) -> None:
"""This can be used to register a http connector after the client has
already been initialized. It must however be called before
:meth:`start()` has been called, or in :meth:`event_before_start()`.
.. warning::
Connectors passed will not be closed on shutdown. You must close
them yourself if you want a graceful shutdown.
Parameters
----------
http_connector: :class:`aiohttp.BaseConnector`
The connector to use for the http session.
"""
if http_connector is not None:
if self.http.connection_exists():
raise RuntimeError(
'http_connector must be registered before startup.')
self.http.connector = http_connector
[github] def setup_internal(self) -> None:
logger = logging.getLogger('aioxmpp')
if logger.getEffectiveLevel() == 30:
logger.setLevel(level=logging.ERROR)
[github] def register_methods(self) -> None:
methods = (func for func in dir(self) if callable(getattr(self, func)))
for method_name in methods:
if method_name.startswith(self.event_prefix):
event = method_name[len(self.event_prefix):]
func = getattr(self, method_name)
self.add_event_handler(event, func)
[github] async def init(self) -> None:
if not self._has_async_init:
self._has_async_init = True
await self._async_init()
[github][docs] def run(self) -> None:
"""This function starts the loop and then calls :meth:`start` for you.
If your program already has an asyncio loop setup, you should use
:meth:`start()` instead.
.. warning::
This function is blocking and should be the last function to run.
Raises
------
AuthException
Raised if invalid credentials in any form was passed or some
other misc failure.
HTTPException
A request error occured while logging in.
"""
async def runner():
async with self.start() as start_future:
await start_future
try:
asyncio.run(runner())
except KeyboardInterrupt:
# StartContext automatically closes for us, so we just catch
# the KeyboardInterrupt wihtout doing anything more here.
pass
[github][docs] def start(self, dispatch_ready: bool = True) -> StartContext:
"""|coro|
Starts the client and logs into the specified user.
This method can be used as a coroutine or an async context manager,
depending on your needs.
How to use as an async context manager: ::
async with client.start():
user = await client.fetch_user('Ninja')
print(user.display_name)
If you want to use it as an async context manager, but also keep the
client running forever, you can await the return of start like this: ::
async with client.start() as future:
user = await client.fetch_user('Ninja')
print(user.display_name)
await future # Nothing after this line will run.
.. warning::
This method is blocking if you await it as a coroutine or you
await the return future. This means that no code coming after
will run until the client is closed. When the client is ready
it will dispatch :meth:`event_ready`.
Parameters
----------
dispatch_ready: :class:`bool`
Whether or not the client should dispatch the ready event when
ready.
Raises
------
AuthException
Raised if invalid credentials in any form was passed or some
other misc failure.
HTTPException
A request error occured while logging in.
"""
return StartContext(self, dispatch_ready=dispatch_ready)
[github] async def _start(self, dispatch_ready: bool = True) -> None:
await self.init()
if self._first_start:
self.register_methods()
if dispatch_ready:
await self.dispatch_and_wait_event('before_start')
# Do this after before_start() in case any connectors
# are registered during the execution.
self.http.create_connection()
self._first_start = False
_started_while_restarting = self._restarting
pri = self._reauth_lock.priority if _started_while_restarting else 0
self._closed_event.clear()
if self._closed:
self.http.create_connection()
self._closed = False
ret = await self._login(priority=pri)
if ret is False:
return
self._set_ready()
if dispatch_ready:
self.dispatch_event('ready')
async def waiter(task):
done, _ = await asyncio.wait(
(task, self._exception_future),
return_when=asyncio.FIRST_COMPLETED
)
try:
exc = done.pop().exception()
except asyncio.CancelledError:
pass
else:
raise exc
self._refresh_task = self.loop.create_task(
self.auth.run_refresh_loop()
)
await waiter(self._refresh_task)
if not _started_while_restarting and self._restarting:
[github] async def runner():
await self.loop.create_future()
self._start_runner_task = self.loop.create_task(runner())
await waiter(self._start_runner_task)
[github] async def _setup_client_user(self, priority: int = 0):
tasks = [
self.http.account_get_by_user_id(
self.auth.account_id,
priority=priority
),
self.http.account_graphql_get_clients_external_auths(
priority=priority
),
self.http.account_get_external_auths_by_id(
self.auth.account_id,
priority=priority
),
]
data, ext_data, extra_ext_data, *_ = await asyncio.gather(*tasks)
data['externalAuths'] = ext_data['myAccount']['externalAuths'] or []
data['extraExternalAuths'] = extra_ext_data
self.user = ClientUser(self, data)
[github] async def _login(self, priority: int = 0) -> None:
log.debug('Running authenticating')
ret = await self.auth._authenticate(priority=priority)
if ret is False:
return False
await self._setup_client_user(priority=priority)
if self.auth.eula_check_needed() and self.accept_eula:
await self.auth.accept_eula(
priority=priority
)
log.debug('EULA accepted')
[github] async def _kill_tokens(self) -> None:
async def killer(token):
if token is None:
return
try:
await self.http.account_sessions_kill_token(token)
except HTTPException:
# All exchanged sessions should be killed when the original
# session is killed, but this doesn't seem to be consistant.
# The solution is to attempt to kill each token and then just
# catch 401.
pass
if not self._restarting:
tasks = (
killer(getattr(self.auth, 'ios_access_token', None)),
killer(getattr(self.auth, 'access_token', None)),
)
await asyncio.gather(*tasks)
[github] def _clear_caches(self) -> None:
self._users.clear()
[github] async def _close(self, *,
close_http: bool = True,
dispatch_close: bool = True,
priority: int = 0) -> None:
self._closing = True
await self._kill_tokens()
self._clear_caches()
if self._ready_event is not None:
self._ready_event.clear()
if close_http:
self._closed = True
await self.http.close()
if self.auth.refresh_loop_running():
self._refresh_task.cancel()
if not self._restarting:
if (self._start_runner_task is not None
and not self._start_runner_task.cancelled()):
self._start_runner_task.cancel()
self._closing = False
if dispatch_close and self._closed_event is not None:
self._set_closed()
log.debug('Successfully logged out')
[github][docs] async def close(self, *,
close_http: bool = True,
dispatch_close: bool = True) -> None:
"""|coro|
Logs the user out and closes running services.
Parameters
----------
close_http: :class:`bool`
Whether or not to close the clients :class:`aiohttp.ClientSession`
when logged out.
dispatch_close: :class:`bool`
Whether or not to dispatch the close event.
Raises
------
HTTPException
An error occured while logging out.
"""
if dispatch_close:
await asyncio.gather(
self.dispatch_and_wait_event('before_close'),
self.dispatch_and_wait_event('close'),
)
await self._close(
close_http=close_http,
dispatch_close=dispatch_close
)
[github][docs] def is_closed(self) -> bool:
""":class:`bool`: Whether the client is running or not."""
return self._closed
[github] def can_restart(self) -> bool:
return hasattr(self.auth, 'ios_refresh_token')
[github][docs] async def restart(self) -> None:
"""|coro|
Restarts the client completely. All events received while this method
runs are dispatched when it has finished.
Raises
------
AuthException
Raised if invalid credentials in any form was passed or some
other misc failure.
HTTPException
A request error occured while logging in.
"""
self._reauth_lock.priority += 1
priority = self._reauth_lock.priority
async with MaybeLock(self._reauth_lock):
self._restarting = True
self._refresh_times.append(time.time())
ios_refresh_token = self.auth.ios_refresh_token
self.recover_events()
await self._close(
close_http=False,
dispatch_close=False,
priority=priority
)
auth = RefreshTokenAuth(
refresh_token=ios_refresh_token
)
auth.initialize(self)
self.auth = auth
[github] async def runner():
try:
await self.start(dispatch_ready=False)
except Exception as e:
return e
tasks = (
self.loop.create_task(runner()),
self.loop.create_task(self.wait_until_ready()),
)
d, p = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
done_task = d.pop()
if done_task.result() is not None:
p.pop().cancel()
raise done_task.result()
self.dispatch_event('restart')
self._restarting = False
log.debug('Successully restarted the client.')
[github] def recover_events(self) -> None:
pass
[github] def _set_ready(self) -> None:
self._ready_event.set()
[github] def _set_closed(self) -> None:
self._closed_event.set()
[github][docs] def is_ready(self) -> bool:
"""Specifies if the internal state of the client is ready.
Returns
-------
:class:`bool`
``True`` if the internal state is ready else ``False``
"""
return self._ready_event.is_set()
[github][docs] async def wait_until_ready(self) -> None:
"""|coro|
Waits until the internal state of the client is ready.
"""
if self._ready_event is not None:
await self._ready_event.wait()
else:
raise RuntimeError(
'The client has not been fully initialized. Make sure '
'Client.init() has been called before using this method.'
)
[github][docs] async def wait_until_closed(self) -> None:
"""|coro|
Waits until the client is fully closed.
"""
if self._closed_event is not None:
await self._closed_event.wait()
else:
raise RuntimeError(
'The client has not been fully initialized. Make sure '
'Client.init() has been called before using this method.'
)
[github][docs] async def fetch_user_by_display_name(self, display_name: str, *,
cache: bool = False,
raw: bool = False
) -> Optional[User]:
"""|coro|
Fetches a user from the passed display name.
Parameters
----------
display_name: :class:`str`
The display name of the user you want to fetch the user for.
cache: :class:`bool`
If set to True it will try to get the user from the friends or
user cache.
.. note::
Setting this parameter to False will make it an api call.
raw: :class:`bool`
If set to True it will return the data as you would get it from
the api request.
.. note::
Setting raw to True does not work with cache set to True.
Raises
------
HTTPException
An error occured while requesting the user.
Returns
-------
Optional[:class:`User`]
The user requested. If not found it will return ``None``.
"""
if cache:
for u in self._users.values():
try:
if u.display_name is not None:
if u.display_name.casefold() == display_name.casefold(): # noqa
return u
except AttributeError:
pass
try:
data = await self.http.account_get_by_display_name(display_name)
except HTTPException as e:
error_code = 'errors.com.epicgames.account.account_not_found'
if e.message_code == error_code:
return None
raise
if raw:
return data
return self.store_user(data, try_cache=cache)
[github][docs] async def fetch_users_by_display_name(self, display_name: str, *,
raw: bool = False
) -> Optional[User]:
"""|coro|
Fetches all users including external users (accounts from other
platforms) that matches the given the display name.
.. warning::
This function is not for requesting multiple users by multiple
display names. Use :meth:`BasicClient.fetch_user()` for that.
Parameters
----------
display_name: :class:`str`
The display name of the users you want to get.
raw: :class:`bool`
If set to True it will return the data as you would get it from
the api request. *Defaults to ``False``*
Raises
------
HTTPException
An error occured while requesting the user.
Returns
-------
List[:class:`User`]
A list containing all payloads found for this user.
"""
res = await self.http.account_graphql_get_by_display_name(display_name)
if raw:
return res['account']
return [User(self, account) for account in res['account']]
[github][docs] async def fetch_user(self, user, *,
cache: bool = False,
raw: bool = False
) -> Optional[User]:
"""|coro|
Fetches a single user by the given id/displayname.
Parameters
----------
user: :class:`str`
Id or display name
cache: :class:`bool`
If set to True it will try to get the user from the friends or
user cache and fall back to an api request if not found.
.. note::
Setting this parameter to False will make it an api call.
raw: :class:`bool`
If set to True it will return the data as you would get it from
the api request.
.. note::
Setting raw to True does not work with cache set to True.
Raises
------
HTTPException
An error occured while requesting the user.
Returns
-------
Optional[:class:`User`]
The user requested. If not found it will return ``None``
"""
try:
data = await self.fetch_users((user,), cache=cache, raw=raw)
return data[0]
except IndexError:
return None
[github][docs] async def fetch_users(self, users: Iterable[str], *,
cache: bool = False,
raw: bool = False) -> List[User]:
"""|coro|
Fetches multiple users at once by the given ids/displaynames.
Parameters
----------
users: Iterable[:class:`str`]
An iterable containing ids/displaynames.
cache: :class:`bool`
If set to True it will try to get the users from the friends or
user cache and fall back to an api request if not found.
.. note::
Setting this parameter to False will make it an api call.
raw: :class:`bool`
If set to True it will return the data as you would get it from
the api request.
.. note::
Setting raw to True does not work with cache set to True.
Raises
------
HTTPException
An error occured while requesting user information.
Returns
-------
List[:class:`User`]
Users requested. Only users that are found gets returned.
"""
_users = []
new = []
tasks = []
def find_by_display_name(dn):
if cache:
for u in self._users.values():
try:
if u.display_name is not None:
if u.display_name.casefold() == dn.casefold():
_users.append(u)
return
except AttributeError:
pass
task = self.http.account_get_by_display_name(elem)
tasks.append(task)
for elem in users:
if is_display_name(elem):
find_by_display_name(elem)
else:
if cache:
p = self.get_user(elem)
if p:
if raw:
_users.append(p.get_raw())
else:
_users.append(p)
continue
new.append(elem)
if not _users and not new and not tasks:
return []
if len(tasks) > 0:
pfs = await asyncio.gather(*tasks)
for account_data in pfs:
new.append(account_data['id'])
chunk_tasks = []
chunks = (new[i:i + 100] for i in range(0, len(new), 100))
for chunk in chunks:
task = self.http.account_get_multiple_by_user_id(chunk) # noqa
chunk_tasks.append(task)
if len(chunk_tasks) > 0:
d = await asyncio.gather(*chunk_tasks)
for results in d:
for result in results:
if raw:
_users.append(result)
else:
u = self.store_user(result, try_cache=cache)
_users.append(u)
return _users
[github][docs] async def fetch_user_by_email(self, email, *,
cache: bool = False,
raw: bool = False) -> Optional[User]:
"""|coro|
Fetches a single user by the email.
.. warning::
Because of epicgames throttling policy, you can only do this
request three times in a timespan of 600 seconds. If you were
to do more than three requests in that timespan, a
:exc:`HTTPException` would be raised.
Parameters
----------
email: :class:`str`
The email of the account you are requesting.
cache: :class:`bool`
If set to True it will try to get the user from the friends or
user cache and fall back to an api request if not found.
.. note::
This method does two api requests but with this set to False
only one request will be done as long as the user is found in
one of the caches.
raw: :class:`bool`
If set to True it will return the data as you would get it from
the api request.
.. note::
Setting raw to True does not work with cache set to True.
Raises
------
HTTPException
An error occured while requesting the user.
Returns
-------
Optional[:class:`User`]
The user requested. If not found it will return ``None``
"""
try:
res = await self.http.account_get_by_email(email)
except HTTPException as e:
m = 'errors.com.epicgames.account.account_not_found'
if e.message_code == m:
return None
raise
account_id = res['id']
return await self.fetch_user(account_id, cache=cache, raw=raw)
[github][docs] async def search_users(self, prefix: str,
platform: UserSearchPlatform
) -> List[UserSearchEntry]:
"""|coro|
Searches after users by a prefix and returns up to 100 matches.
Parameters
----------
prefix: :class:`str`
| The prefix you want to search by. The prefix is case insensitive.
| Example: ``Tfue`` will return Tfue's user + up to 99 other
users which have display names that start with or match exactly
to ``Tfue`` like ``Tfue_Faze dequan``.
platform: :class:`UserSearchPlatform`
The platform you wish to search by.
.. note::
The platform is only important for prefix matches. All exact
matches are returned regardless of which platform is
specified.
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
List[:class:`UserSearchEntry`]
An ordered list of users that matched the prefix.
"""
if not isinstance(platform, UserSearchPlatform):
raise TypeError(
'The platform passed must be a constant from '
'fortnitepy.UserSearchPlatform'
)
res = await self.http.user_search_by_prefix(
self.user.id,
prefix,
platform.value
)
user_ids = (d['accountId'] for d in res)
users = await self.fetch_users(user_ids, raw=True)
lookup = {p['id']: p for p in users}
entries = []
for data in res:
user_data = lookup.get(data['accountId'])
if user_data is None:
continue
obj = UserSearchEntry(self, user_data, data)
entries.append(obj)
return entries
[github][docs] async def fetch_avatars(self, users: List[str]) -> Dict[str, Avatar]:
"""|coro|
Fetches the avatars of the provided user ids.
.. warning::
You can only fetch avatars of friends. That means that the bot has
to be friends with the users you are requesting the avatars of.
Parameters
----------
users: List[:class:`str`]
A list containing user ids.
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
Dict[:class:`str`, :class:`Avatar`]
A dict containing avatars mapped to their user id.
"""
chunk_tasks = []
chunks = (users[i:i + 100] for i in range(0, len(users), 100))
for chunk in chunks:
task = self.http.avatar_get_multiple_by_user_id(chunk)
chunk_tasks.append(task)
results = {}
if len(chunk_tasks) > 0:
d = await asyncio.gather(*chunk_tasks)
for chunk_results in d:
for avatar_data in chunk_results:
results[avatar_data['accountId']] = Avatar(avatar_data)
return results
[github][docs] async def search_sac_by_slug(self, slug: str) -> List[SacSearchEntryUser]:
"""|coro|
Searches for an owner of slug + retrieves owners of similar slugs.
Parameters
----------
slug: :class:`str`
The slug (support a creator code) you wish to search for.
Raises
------
HTTPException
An error occured while requesting fortnite's services.
Returns
-------
List[:class:`SacSearchEntryUser`]
An ordered list of users who matched the exact or slightly
modified slug.
"""
res = await self.http.payment_website_search_sac_by_slug(slug)
user_ids = (e['id'] for e in res)
users = await self.fetch_users(
user_ids,
raw=True
)
lookup = {p['id']: p for p in users}
entries = []
for data in res:
user_data = lookup.get(data['id'])
if user_data is None:
continue
obj = SacSearchEntryUser(self, user_data, data)
entries.append(obj)
return entries
[github] def store_user(self, data: dict, *, try_cache: bool = True) -> User:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._users[user_id]
except KeyError:
pass
user = User(self, data)
if self.cache_users:
self._users[user.id] = user
return user
[github][docs] def get_user(self, user_id: str) -> Optional[User]:
"""Tries to get a user from the user cache by the given user id.
Parameters
----------
user_id: :class:`str`
The id of the user.
Returns
-------
Optional[:class:`User`]
The user if found, else ``None``
"""
return self._users.get(user_id)
[github][docs] async def fetch_blocklist(self) -> List[str]:
"""|coro|
Retrieves the blocklist with an api call.
Raises
------
HTTPException
An error occured while fetching blocklist.
Returns
-------
List[:class:`str`]
List of ids
"""
return await self.http.friends_get_blocklist()
[github][docs] async def block_user(self, user_id: str) -> None:
"""|coro|
Blocks a user by a given user id.
Parameters
----------
user_id: :class:`str`
The id of the user you want to block.
Raises
------
HTTPException
Something went wrong when trying to block this user.
"""
await self.http.friends_block(user_id)
[github][docs] async def unblock_user(self, user_id: str) -> None:
"""|coro|
Unblocks a user by a given user id.
Parameters
----------
user_id: :class:`str`
The id of the user you want to unblock
Raises
------
HTTPException
Something went wrong when trying to unblock this user.
"""
await self.http.friends_unblock(user_id)
[github][docs] async def add_friend(self, user_id: str) -> None:
"""|coro|
Sends a friend request to the specified user id.
Parameters
----------
user_id: :class:`str`
The id of the user you want to add.
Raises
------
NotFound
The specified user does not exist.
DuplicateFriendship
The client is already friends with this user.
FriendshipRequestAlreadySent
The client has already sent a friendship request that has not been
handled yet by the user.
MaxFriendshipsExceeded
The client has hit the max amount of friendships a user can
have at a time. For most accounts this limit is set to ``1000``
but it could be higher for others.
InviteeMaxFriendshipsExceeded
The user you attempted to add has hit the max amount of friendships
a user can have at a time.
InviteeMaxFriendshipRequestsExceeded
The user you attempted to add has hit the max amount of friendship
requests a user can have at a time. This is usually ``700`` total
requests.
Forbidden
The client is not allowed to send friendship requests to the user
because of the users settings.
HTTPException
An error occured while requesting to add this friend.
"""
try:
await self.http.friends_add_or_accept(user_id)
except HTTPException as exc:
m = 'errors.com.epicgames.friends.account_not_found'
if exc.message_code == m:
raise NotFound('The specified account does not exist.')
m = 'errors.com.epicgames.friends.duplicate_friendship'
if exc.message_code == m:
raise DuplicateFriendship('This friendship already exists.')
m = 'errors.com.epicgames.friends.friend_request_already_sent'
if exc.message_code == m:
raise FriendshipRequestAlreadySent(
'A friendship request already exists for this user.'
)
m = 'errors.com.epicgames.friends.inviter_friendships_limit_exceeded' # noqa
if exc.message_code == m:
raise MaxFriendshipsExceeded(
'The client has hit the friendships limit.'
)
m = 'errors.com.epicgames.friends.invitee_friendships_limit_exceeded' # noqa
if exc.message_code == m:
raise InviteeMaxFriendshipsExceeded(
'The user has hit the friendships limit.'
)
m = 'errors.com.epicgames.friends.incoming_friendships_limit_exceeded' # noqa
if exc.message_code == m:
raise InviteeMaxFriendshipRequestsExceeded(
'The user has hit the incoming friendship requests limit.'
)
m = ('errors.com.epicgames.friends.'
'cannot_friend_due_to_target_settings')
if exc.message_code == m:
raise Forbidden('You cannot send friendship requests to '
'this user.')
raise
[github][docs] async def accept_friend(self, user_id: str) -> None:
"""|coro|
Accepts a request.
Parameters
----------
user_id: :class:`str`
The id of the user you want to accept.
Raises
------
NotFound
The specified user does not exist.
DuplicateFriendship
The client is already friends with this user.
FriendshipRequestAlreadySent
The client has already sent a friendship request that has not been
handled yet by the user.
Forbidden
The client is not allowed to send friendship requests to the user
because of the users settings.
HTTPException
An error occured while requesting to accept this friend.
"""
await self.add_friend(user_id)
[github][docs] async def remove_or_decline_friend(self, user_id: str) -> None:
"""|coro|
Removes a friend by the given id.
Parameters
----------
user_id: :class:`str`
The id of the friend you want to remove.
Raises
------
HTTPException
Something went wrong when trying to remove this friend.
"""
await self.http.friends_remove_or_decline(user_id)
[github] async def dispatch_and_wait_event(self, event: str,
*args: Any,
**kwargs: Any) -> None:
coros = self._events.get(event, [])
tasks = [asyncio.create_task(coro()) for coro in coros]
if tasks:
await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
[github] def _dispatcher(self, coro: Awaitable,
*args: Any,
**kwargs: Any) -> asyncio.Future:
return asyncio.ensure_future(coro(*args, **kwargs))
[github] def dispatch_event(self, event: str,
*args: Any,
**kwargs: Any) -> List[asyncio.Future]:
listeners = self._listeners.get(event)
if listeners:
removed = []
for i, (future, check) in enumerate(listeners):
if future.cancelled():
removed.append(i)
continue
try:
result = check(*args)
except Exception as e:
future.set_exception(e)
removed.append(i)
else:
if result:
if len(args) == 0:
future.set_result(None)
elif len(args) == 1:
future.set_result(args[0])
else:
future.set_result(args)
removed.append(i)
if len(removed) == len(listeners):
self._listeners.pop(event)
else:
for idx in reversed(removed):
del listeners[idx]
tasks = []
if event in self._events:
for coro in self._events[event]:
task = self._dispatcher(coro, *args, **kwargs)
tasks.append(task)
return tasks
[github][docs] def wait_for(self, event: str, *,
check: Callable = None,
timeout: Optional[int] = None) -> Any:
"""|coro|
Waits for an event to be dispatch.
In case the event returns more than one arguments, a tuple is passed
containing the arguments.
Examples
--------
This example waits for the author of a :class:`FriendMessage` to say
hello.: ::
@client.event
async def event_friend_message(message):
await message.reply('Say hello!')
def check_function(m):
return m.author.id == message.author.id
msg = await client.wait_for('message', check=check_function, timeout=60)
await msg.reply('Hello {0.author.display_name}!'.format(msg))
This example waits for the the leader of a party to promote the bot
after joining and then sets a new custom key: ::
@client.event
async def event_party_member_join(member):
# checks if the member that joined is the UserClient
if member.id != client.user.id:
return
def check(m):
return m.id == client.user.id
try:
await client.wait_for('party_member_promote', check=check, timeout=120)
except asyncio.TimeoutError:
await member.party.send('You took too long to promote me!')
await member.party.set_custom_key('my_custom_key_123')
Parameters
----------
event: :class:`str`
The name of the event.
.. note::
| The name of the event must be **without** the ``event_``
prefix.
|
| Wrong = ``event_friend_message``.
| Correct = ``friend_message``.
check: Optional[Callable]
A predicate to check what to wait for.
Defaults to a predicate that always returns ``True``. This means
it will return the first result unless you pass another predicate.
timeout: :class:`int`
How many seconds to wait for before asyncio.TimeoutError is raised.
*Defaults to ``None`` which means it will wait forever.*
Raises
------
asyncio.TimeoutError
No event was retrieved in the time you specified.
Returns
-------
Any
Returns arguments based on the event you are waiting for. An event
might return no arguments, one argument or a tuple of arguments.
Check the :ref:`event reference <fortnitepy-events-api> for more
information about the returning arguments.`
""" # noqa
future = self.loop.create_future()
if check is None:
check = _check
ev = (event.lower()).replace(self.event_prefix, '')
try:
listeners = self._listeners[ev]
except KeyError:
listeners = []
self._listeners[ev] = listeners
listeners.append((future, check))
return asyncio.wait_for(future, timeout)
[github] def _event_has_handler(self, event: str) -> bool:
handlers = self._events.get(event.lower())
return handlers is not None and len(handlers) > 0
[github] def _event_has_destination(self, event: str) -> bool:
if event in self._listeners:
return True
elif self._event_has_handler(event):
return True
return False
[github][docs] def add_event_handler(self, event: str, coro: Awaitable[Any]) -> None:
"""Registers a coroutine as an event handler. You can register as many
coroutines as you want to a single event.
Parameters
----------
event: :class:`str`
The name of the event you want to register this coro for.
coro: :ref:`coroutine <coroutine>`
The coroutine to function as the handler for the specified event.
Raises
------
TypeError
The function passed to coro is not a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError('event registered must be a coroutine function')
if event.startswith(self.event_prefix):
event = event[len(self.event_prefix):]
if event not in self._events:
self._events[event] = []
self._events[event].append(coro)
[github][docs] def remove_event_handler(self, event: str, coro: Awaitable) -> None:
"""Removes a coroutine as an event handler.
Parameters
----------
event: :class:`str`
The name of the event you want to remove this coro for.
coro: :ref:`coroutine <coroutine>`
The coroutine that already functions as a handler for the
specified event.
"""
if event not in self._events:
return
self._events[event] = [c for c in self._events[event] if c != coro]
[github][docs] def event(self,
event_or_coro: Union[str, Awaitable[Any]] = None) -> Awaitable:
"""A decorator to register an event.
.. note::
You do not need to decorate events in a subclass of
:class:`BasicClient` but the function names of event handlers must
follow this format ``event_<event>``.
Usage: ::
@client.event
async def event_friend_message(message):
await message.reply('Thanks for your message!')
@client.event('friend_message')
async def my_message_handler(message):
await message.reply('Thanks for your message!')
Raises
------
TypeError
The decorated function is not a coroutine.
TypeError
Event is not specified as argument or function name with event
prefix.
"""
is_coro = callable(event_or_coro)
def pred(coro):
if isinstance(coro, staticmethod):
coro = coro.__func__
if not asyncio.iscoroutinefunction(coro):
raise TypeError('the decorated function must be a coroutine')
if is_coro or event_or_coro is None:
if not coro.__name__.startswith(self.event_prefix):
raise TypeError('non specified events must follow '
'this function name format: '
'"{}<event>"'.format(self.event_prefix))
name = coro.__name__[len(self.event_prefix):]
else:
name = event_or_coro
self.add_event_handler(name, coro)
log.debug('{} has been registered as a handler for the '
'event {}'.format(coro.__name__, name))
return coro
return pred(event_or_coro) if is_coro else pred
[github] def _process_stats_times(self, start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None
) -> Tuple[Optional[int], Optional[int]]:
epoch = datetime.datetime.utcfromtimestamp(0)
if isinstance(start_time, datetime.datetime):
start_time = int((start_time - epoch).total_seconds())
elif isinstance(start_time, SeasonStartTimestamp):
start_time = start_time.value
if isinstance(end_time, datetime.datetime):
end_time = int((end_time - epoch).total_seconds())
elif isinstance(end_time, SeasonEndTimestamp):
end_time = end_time.value
return start_time, end_time
[github][docs] async def fetch_br_stats(self, user_id: str, *,
start_time: Optional[DatetimeOrTimestamp] = None,
end_time: Optional[DatetimeOrTimestamp] = None
) -> StatsV2:
"""|coro|
Gets Battle Royale stats the specified user.
Parameters
----------
user_id: :class:`str`
The id of the user you want to fetch stats for.
start_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonStartTimestamp`]]
The UTC start time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
end_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonEndTimestamp`]]
The UTC end time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
Raises
------
Forbidden
| The user has chosen to be hidden from public stats by disabling
the fortnite setting below.
| ``Settings`` -> ``Account and Privacy`` -> ``Show on career
leaderboard``
HTTPException
An error occured while requesting.
Returns
-------
:class:`StatsV2`
An object representing the stats for this user. If the user was
not found ``None`` is returned.
""" # noqa
start_time, end_time = self._process_stats_times(start_time, end_time)
tasks = [
self.fetch_user(user_id, cache=True),
self.http.stats_get_v2(
user_id,
start_time=start_time,
end_time=end_time
)
]
results = await asyncio.gather(*tasks)
if results[1] == '':
raise Forbidden('This user has chosen to be hidden '
'from public stats.')
return StatsV2(*results) if results[0] is not None else None
[github] async def _multiple_stats_chunk_requester(self, user_ids: List[str], stats: List[str], *, # noqa
collection: Optional[str] = None,
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None # noqa
) -> List[dict]:
chunks = [user_ids[i:i+51] for i in range(0, len(user_ids), 51)]
stats_chunks = [stats[i:i+20] for i in range(0, len(stats), 20)]
tasks = []
for chunk in chunks:
for stats_chunk in stats_chunks:
tasks.append(self.http.stats_get_multiple_v2(
chunk,
stats_chunk,
category=collection,
start_time=start_time,
end_time=end_time
))
results = await asyncio.gather(*tasks)
return [item for sub in results for item in sub]
[github] async def _fetch_multiple_br_stats(self, cls: _StatsBase,
user_ids: List[str],
stats: List[str],
*,
collection: Optional[str] = None, # noqa
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None, # noqa
) -> Dict[str, StatsV2]:
start_time, end_time = self._process_stats_times(start_time, end_time)
tasks = [
self.fetch_users(user_ids, cache=True),
self._multiple_stats_chunk_requester(
user_ids,
stats,
collection=collection,
start_time=start_time,
end_time=end_time
)
]
results = await asyncio.gather(*tasks)
if len(results[0]) > 0 and isinstance(results[0][0], dict):
results = results[::-1]
res = {}
for udata in results[1]:
if udata['accountId'] in res and res[udata['accountId']] is not None: # noqa
res[udata['accountId']].raw['stats'].update(udata['stats'])
continue
r = [x for x in results[0] if x.id == udata['accountId']]
user = r[0] if len(r) != 0 else None
res[udata['accountId']] = (cls(user, udata)
if user is not None else None)
return res
[github][docs] async def fetch_multiple_br_stats(self, user_ids: List[str],
stats: List[str],
*,
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None # noqa
) -> Dict[str, Optional[StatsV2]]:
"""|coro|
Gets Battle Royale stats for multiple users at the same time.
.. note::
This function is not the same as doing :meth:`fetch_br_stats` for
multiple users. The expected return for this function would not be
all the stats for the specified users but rather the stats you
specify.
Example usage: ::
async def stat_function():
stats = [
fortnitepy.StatsV2.create_stat('placetop1', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo'),
fortnitepy.StatsV2.create_stat('kills', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo'),
fortnitepy.StatsV2.create_stat('matchesplayed', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo')
]
# get the users and create a list of their ids.
users = await self.fetch_users(['Ninja', 'DrLupo'])
user_ids = [u.id for u in users] + ['NonValidUserIdForTesting']
data = await self.fetch_multiple_br_stats(user_ids=user_ids, stats=stats)
for id, res in data.items():
if res is not None:
print('ID: {0} | Stats: {1}'.format(id, res.get_stats()))
else:
print('ID: {0} not found.'.format(id))
# Example output:
# ID: 463ca9d604524ce38071f512baa9cd70 | Stats: {'keyboardmouse': {'defaultsolo': {'wins': 759, 'kills': 28093, 'matchesplayed': 6438}}}
# ID: 3900c5958e4b4553907b2b32e86e03f8 | Stats: {'keyboardmouse': {'defaultsolo': {'wins': 1763, 'kills': 41375, 'matchesplayed': 7944}}}
# ID: 4735ce9132924caf8a5b17789b40f79c | Stats: {'keyboardmouse': {'defaultsolo': {'wins': 1888, 'kills': 40784, 'matchesplayed': 5775}}}
# ID: NonValidUserIdForTesting not found.
Parameters
----------
user_ids: List[:class:`str`]
A list of ids you are requesting the stats for.
stats: List[:class:`str`]
A list of stats to get for the users. Use
:meth:`StatsV2.create_stat` to create the stats.
Example: ::
[
fortnitepy.StatsV2.create_stat('placetop1', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo'),
fortnitepy.StatsV2.create_stat('kills', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo'),
fortnitepy.StatsV2.create_stat('matchesplayed', fortnitepy.V2Input.KEYBOARDANDMOUSE, 'defaultsolo')
]
start_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonStartTimestamp`]]
The UTC start time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
end_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonEndTimestamp`]]
The UTC end time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
Dict[:class:`str`, Optional[:class:`StatsV2`]]
A mapping where :class:`StatsV2` is bound to its owners id. If a
userid was not found then the value bound to that userid will be
``None``.
.. note::
If a users stats is missing in the returned mapping it means
that the user has opted out of public leaderboards and that
the client therefore does not have permissions to requests
their stats.
""" # noqa
res = await self._fetch_multiple_br_stats(
cls=StatsV2,
user_ids=user_ids,
stats=stats,
start_time=start_time,
end_time=end_time,
)
return res
[github][docs] async def fetch_multiple_br_stats_collections(self, user_ids: List[str],
collection: Optional[StatsCollectionType] = None, # noqa
*,
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None # noqa
) -> Dict[str, Optional[StatsCollection]]: # noqa
"""|coro|
Gets Battle Royale stats collections for multiple users at the same time.
Parameters
----------
user_ids: List[:class:`str`]
A list of ids you are requesting the stats for.
collection: :class:`StatsCollectionType`
The collection to receive. Collections are predefined
stats that it attempts to request.
start_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonStartTimestamp`]]
The UTC start time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
end_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonEndTimestamp`]]
The UTC end time of the time period to get stats from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
Dict[:class:`str`, Optional[:class:`StatsCollection`]]
A mapping where :class:`StatsCollection` is bound to its owners id. If a
userid was not found then the value bound to that userid will be
``None``.
.. note::
If a users stats is missing in the returned mapping it means
that the user has opted out of public leaderboards and that
the client therefore does not have permissions to requests
their stats.
""" # noqa
res = await self._fetch_multiple_br_stats(
cls=StatsCollection,
user_ids=user_ids,
stats=[],
collection=collection.value,
start_time=start_time,
end_time=end_time,
)
return res
[github][docs] async def fetch_multiple_battlepass_levels(self,
users: List[str],
season: int,
*,
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None # noqa
) -> Dict[str, float]:
"""|coro|
Fetches multiple users battlepass level.
Parameters
----------
users: List[:class:`str`]
List of user ids.
season: :class:`int`
The season number to request the battlepass levels for.
.. warning::
If you are requesting the previous season and the new season has not been
added to the library yet (check :class:`SeasonStartTimestamp`), you have to
manually include the previous seasons end timestamp in epoch seconds.
start_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonStartTimestamp`]]
The UTC start time of the window to get the battlepass level from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
end_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonEndTimestamp`]]
The UTC end time of the window to get the battlepass level from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
Dict[:class:`str`, Optional[:class:`float`]]
Users battlepass level mapped to their account id. Returns ``None``
if no battlepass level was found. If a user has career board set
to private, he/she will not appear in the result. Therefore you
should never expect a user to be included.
.. note::
The decimals are the percent progress to the next level.
E.g. ``208.63`` -> ``Level 208 and 63% on the way to 209.``
.. note::
If a users battlepass level is missing in the returned mapping it means
that the user has opted out of public leaderboards and that
the client therefore does not have permissions to requests
their stats.
""" # noqa
start_time, end_time = self._process_stats_times(start_time, end_time)
if end_time is not None:
e = getattr(SeasonStartTimestamp, 'SEASON_{}'.format(season), None)
if e is not None and end_time < e.value:
raise ValueError(
'end_time can\'t be lower than the seasons start timestamp'
)
e = getattr(BattlePassStat, 'SEASON_{}'.format(season), None)
if e is not None:
info = e.value
stats = info[0] if isinstance(info[0], tuple) else (info[0],)
end_time = end_time if end_time is not None else info[1]
else:
stats = ('s{0}_social_bp_level'.format(season),)
data = await self._multiple_stats_chunk_requester(
users,
stats,
start_time=start_time,
end_time=end_time
)
def get_stat(user_data):
for stat in stats:
value = user_data.get(stat)
if value is not None:
return value / 100
return {e['accountId']: get_stat(e['stats']) for e in data}
[github][docs] async def fetch_battlepass_level(self, user_id: str, *,
season: int,
start_time: Optional[DatetimeOrTimestamp] = None, # noqa
end_time: Optional[DatetimeOrTimestamp] = None # noqa
) -> float:
"""|coro|
Fetches a users battlepass level.
Parameters
----------
user_id: :class:`str`
The user id to fetch the battlepass level for.
season: :class:`int`
The season number to request the battlepass level for.
.. warning::
If you are requesting the previous season and the new season has not been
added to the library yet (check :class:`SeasonStartTimestamp`), you have to
manually include the previous seasons end timestamp in epoch seconds.
start_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonStartTimestamp`]]
The UTC start time of the window to get the battlepass level from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
end_time: Optional[Union[:class:`int`, :class:`datetime.datetime`, :class:`SeasonEndTimestamp`]]
The UTC end time of the window to get the battlepass level from.
*Must be seconds since epoch, :class:`datetime.datetime` or a constant from SeasonEndTimestamp*
*Defaults to None*
Raises
------
Forbidden
User has private career board.
HTTPException
An error occured while requesting.
Returns
-------
Optional[:class:`float`]
The users battlepass level. ``None`` is returned if the user has
not played any real matches this season.
.. note::
The decimals are the percent progress to the next level.
E.g. ``208.63`` -> ``Level 208 and 63% on the way to 209.``
""" # noqa
data = await self.fetch_multiple_battlepass_levels(
(user_id,),
season=season,
start_time=start_time,
end_time=end_time
)
if user_id not in data:
raise Forbidden('User has private career board.')
return data[user_id]
[github][docs] async def fetch_leaderboard(self, stat: str) -> List[Dict[str, StrOrInt]]:
"""|coro|
Fetches the leaderboard for a stat.
.. warning::
For some weird reason, the only valid stat you can pass is
one with ``placetop1`` (``wins`` is also accepted).
Example usage: ::
async def get_leaderboard():
stat = fortnitepy.StatsV2.create_stat(
'wins',
fortnitepy.V2Input.KEYBOARDANDMOUSE,
'defaultsquad'
)
data = await client.fetch_leaderboard(stat)
for placement, entry in enumerate(data):
print('[{0}] Id: {1} | Wins: {2}'.format(
placement, entry['account'], entry['value']))
Parameters
----------
stat: :class:`str`
The stat you are requesting the leaderboard entries for. You can
use :meth:`StatsV2.create_stat` to create this string.
Raises
------
ValueError
You passed an invalid/non-accepted stat argument.
HTTPException
An error occured when requesting.
Returns
-------
List[Dict[:class:`str`, Union[:class:`str`, :class:`int`]]]
List of dictionaries containing entry data. Example return: ::
{
'account': '4480a7397f824fe4b407077fb9397fbb',
'value': 5082
}
"""
data = await self.http.stats_get_leaderboard_v2(stat)
if len(data['entries']) == 0:
raise ValueError('{0} is not a valid stat'.format(stat))
return data['entries']
[github][docs] async def fetch_party(self, party_id: str) -> Party:
"""|coro|
Fetches a party by its id.
Parameters
----------
party_id: :class:`str`
The id of the party.
Raises
------
Forbidden
You are not allowed to look up this party.
Returns
-------
Optional[:class:`Party`]
The party that was fetched. ``None`` if not found.
"""
try:
data = await self.http.party_lookup(party_id)
except HTTPException as exc:
m = 'errors.com.epicgames.social.party.party_not_found'
if exc.message_code == m:
return None
m = 'errors.com.epicgames.social.party.party_query_forbidden'
if exc.message_code == m:
raise Forbidden('You are not allowed to lookup this party.')
raise
party = Party(self, data)
await party._update_members(members=data['members'])
return party
[github][docs] async def fetch_lightswitch_status(self,
service_id: str = 'Fortnite') -> bool:
"""|coro|
Fetches the lightswitch status of an epicgames service.
Parameters
----------
service_id: :class:`str`
The service id to check status for.
Raises
------
ValueError
The returned data was empty. Most likely because service_id is not
valid.
HTTPException
An error occured when requesting.
Returns
-------
:class:`bool`
``True`` if service is up else ``False``
"""
status = await self.http.lightswitch_get_status(service_id=service_id)
if len(status) == 0:
raise ValueError('emtpy lightswitch response')
return True if status[0].get('status') == 'UP' else False
[github][docs] async def fetch_item_shop(self) -> Store:
"""|coro|
Fetches the current item shop.
Example: ::
# fetches all CIDs (character ids) of of the current item shop.
async def get_current_item_shop_cids():
store = await client.fetch_item_shop()
cids = []
for item in store.featured_items + store.daily_items:
for grant in item.grants:
if grant['type'] == 'AthenaCharacter':
cids.append(grant['asset'])
return cids
Raises
------
HTTPException
An error occured when requesting.
Returns
-------
:class:`Store`
Object representing the data from the current item shop.
"""
data = await self.http.fortnite_get_store_catalog()
return Store(self, data)
[github][docs] async def fetch_br_news(self) -> List[BattleRoyaleNewsPost]:
"""|coro|
Fetches news for the Battle Royale gamemode.
Raises
------
HTTPException
An error occured when requesting.
Returns
-------
:class:`list`
List[:class:`BattleRoyaleNewsPost`]
"""
data = await self.http.fortnitecontent_get()
res = []
msg = data['battleroyalenews']['news'].get('message')
if msg is not None:
res.append(BattleRoyaleNewsPost(msg))
else:
msgs = data['battleroyalenews']['news']['messages']
for msg in msgs:
res.append(BattleRoyaleNewsPost(msg))
return res
[github][docs] async def fetch_br_playlists(self) -> List[Playlist]:
"""|coro|
Fetches all playlists registered on Fortnite. This includes all
previous gamemodes that is no longer active.
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
List[:class:`Playlist`]
List containing all playlists registered on Fortnite.
"""
data = await self.http.fortnitecontent_get()
raw = data['playlistinformation']['playlist_info']['playlists']
playlists = []
for playlist in raw:
try:
p = Playlist(playlist)
playlists.append(p)
except KeyError:
pass
return playlists
[github][docs] async def fetch_active_ltms(self, region: Region) -> List[str]:
"""|coro|
Fetches active LTMs for a specific region.
Parameters
----------
region: :class:`Region`
The region to request active LTMs for.
Raises
------
HTTPException
An error occured while requesting.
Returns
-------
List[:class:`str`]
List of internal playlist names. Returns an empty list of none
LTMs are for the specified region.
"""
data = await self.http.fortnite_get_timeline()
states = data['channels']['client-matchmaking']['states']
region_data = states[len(states) - 1]['state']['region'].get(
region.value, {})
return region_data.get('eventFlagsForcedOn', [])
[github] async def join_party(self, party_id: str) -> None:
raise NotImplementedError(
'BasicClient does not support party actions.'
)
[github][docs] class Client(BasicClient):
"""Represents the client connected to Fortnite and EpicGames' services.
Parameters
----------
auth: :class:`Auth`
The authentication method to use. You can read more about available authentication methods
:ref:`here <authentication>`.
http_connector: :class:`aiohttp.BaseConnector`
The connector to use for http connection pooling.
ws_connector: :class:`aiohttp.BaseConnector`
The connector to use for websocket connection pooling. This could be
the same as the above connector.
status: :class:`str`
The status you want the client to send with its presence to friends.
Defaults to: ``Battle Royale Lobby - {party playercount} / {party max playercount}``
away: :class:`AwayStatus`
The away status the client should use for its presence. Defaults to
:attr:`AwayStatus.ONLINE`.
platform: :class:`.Platform`
The platform you want the client to display as its source.
Defaults to :attr:`Platform.WINDOWS`.
net_cl: :class:`str`
The current net cl used by the current Fortnite build. Named **netCL**
in official logs. Defaults to an empty string which is the recommended
usage as of ``v0.9.0`` since you then
won't need to update it when a new update is pushed by Fortnite.
party_version: :class:`int`
The party version the client should use. This value determines which version
should be able to join the party. If a user attempts to join the clients party
with a different party version than the client, then an error will be visible
saying something by the lines of "Their party of Fortnite is older/newer than
yours". If you experience this error I recommend incrementing the default set
value by one since the library in that case most likely has yet to be updated.
Defaults to ``3`` (As of November 3rd 2020).
default_party_config: :class:`DefaultPartyConfig`
The party configuration used when creating parties. If not specified,
the client will use the default values specified in the data class.
default_party_member_config: :class:`DefaultPartyMemberConfig`
The party member configuration used when creating parties. If not specified,
the client will use the default values specified in the data class.
http_retry_config: Optional[:class:`HTTPRetryConfig`]
The config to use for http retries.
build: :class:`str`
The build used by Fortnite.
Defaults to a valid but maybe outdated value.
os: :class:`str`
The os version string to use in the user agent.
Defaults to ``Windows/10.0.17134.1.768.64bit`` which is valid no
matter which platform you have set.
service_host: :class:`str`
The host used by Fortnite's XMPP services.
service_domain: :class:`str`
The domain used by Fortnite's XMPP services.
service_port: :class:`int`
The port used by Fortnite's XMPP services.
cache_users: :class:`bool`
Whether or not the library should cache :class:`User` objects. Disable
this if you are running a program with lots of users as this could
potentially take a big hit on the memory usage. Defaults to ``True``.
fetch_user_data_in_events: :class:`bool`
Whether or not user data should be fetched in event processing. Disabling
this might be useful for larger applications that deals with
possibly being rate limited on their ip. Defaults to ``True``.
.. warning::
Keep in mind that if this option is disabled, there is a big
chance that display names, external auths and more might be missing
or simply is ``None`` on objects deriving from :class:`User`. Keep in
mind that :attr:`User.id` always will be available. You can use
:meth:`User.fetch()` to update all missing attributes.
wait_for_member_meta_in_events: :class:`bool`
Whether or not the client should wait for party member meta (information
about outfit, backpack etc.) before dispatching events like
:func:`event_party_member_join()`. If this is disabled then member objects
in the events won't have the correct meta. Defaults to ``True``.
leave_party_at_shutdown: :class:`bool`
Whether or not the client should leave its current party at shutdown. If this
is set to false, then the client will attempt to reconnect to the party on a
startup. If :attr:`DefaultPartyMemberConfig.offline_ttl` is exceeded before
a reconnect is attempted, then the client will create a new party at startup.
Attributes
----------
user: :class:`ClientUser`
The user the client is logged in as.
party: :class:`ClientParty`
The party the client is currently connected to.
""" # noqa
[github] def __init__(self, auth: Auth,
**kwargs: Any) -> None:
super().__init__(auth=auth, **kwargs)
self.status = kwargs.get('status', 'Battle Royale Lobby - {party_size} / {party_max_size}') # noqa
self.away = kwargs.get('away', AwayStatus.ONLINE)
self.platform = kwargs.get('platform', Platform.WINDOWS)
self.net_cl = kwargs.get('net_cl', '')
self.party_version = kwargs.get('party_version', 3)
self.party_build_id = '1:{0.party_version}:{0.net_cl}'.format(self)
self.default_party_config = kwargs.get('default_party_config', DefaultPartyConfig()) # noqa
self.default_party_member_config = kwargs.get('default_party_member_config', DefaultPartyMemberConfig()) # noqa
self.service_host = kwargs.get('xmpp_host', 'prod.ol.epicgames.com')
self.service_domain = kwargs.get('xmpp_domain', 'xmpp-service-prod.ol.epicgames.com') # noqa
self.service_port = kwargs.get('xmpp_port', 5222)
self.fetch_user_data_in_events = kwargs.get('fetch_user_data_in_events', True) # noqa
self.wait_for_member_meta_in_events = kwargs.get('wait_for_member_meta_in_events', True) # noqa
self.leave_party_at_shutdown = kwargs.get('leave_party_at_shutdown', True) # noqa
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = {}
self._pending_friends = {}
self._blocked_users = {}
self._presences = {}
self._join_party_lock = None
self._internal_join_party_lock = None
self._join_confirmation = False
self._refresh_times = []
self.setup_internal()
[github] async def _async_init(self) -> None:
# We must deal with loop stuff after a loop has been
# created by asyncio.run(). This is called at the start
# of start().
self.loop = asyncio.get_running_loop()
self._exception_future = self.loop.create_future()
self._ready_event = asyncio.Event()
self._closed_event = asyncio.Event()
self._join_party_lock = LockEvent()
self._internal_join_party_lock = LockEvent()
self._reauth_lock = LockEvent()
self._reauth_lock.failed = False
self.auth.initialize(self)
[github][docs] def register_connectors(self,
http_connector: Optional[BaseConnector] = None,
ws_connector: Optional[BaseConnector] = None
) -> None:
"""This can be used to register connectors after the client has
already been initialized. It must however be called before
:meth:`start()` has been called, or in :meth:`event_before_start()`.
.. warning::
Connectors passed will not be closed on shutdown. You must close
them yourself if you want a graceful shutdown.
Parameters
----------
http_connector: :class:`aiohttp.BaseConnector`
The connector to use for the http session.
ws_connector: :class:`aiohttp.BaseConnector`
The connector to use for the websocket xmpp connection.
"""
super().register_connectors(http_connector=http_connector)
if ws_connector is not None:
if self.xmpp.xmpp_client is not None:
raise RuntimeError(
'ws_connector must be registered before startup')
self.xmpp.ws_connector = ws_connector
@property
def default_party_config(self) -> DefaultPartyConfig:
return self._default_party_config
[github] @default_party_config.setter
def default_party_config(self, obj: DefaultPartyConfig) -> None:
obj._inject_client(self)
self._default_party_config = obj
@property
def default_party_member_config(self) -> DefaultPartyMemberConfig:
return self._default_party_member_config
[github] @default_party_member_config.setter
def default_party_member_config(self, o: DefaultPartyMemberConfig) -> None:
self._default_party_member_config = o
[github] @property
def friends(self) -> List[Friend]:
"""List[:class:`Friend`]: A list of the clients friends."""
return list(self._friends.values())
[github] @property
def friend_count(self) -> int:
""":class:`int`: The amount of friends the bot currently has."""
return len(self._friends)
[github] @property
def pending_friends(self) -> List[Union[IncomingPendingFriend,
OutgoingPendingFriend]]:
"""List[Union[:class:`IncomingPendingFriend`,
:class:`OutgoingPendingFriend`]]: A list of all of the clients
pending friends.
.. note::
Pending friends can be both incoming (pending friend sent the
request to the bot) or outgoing (the bot sent the request to the
pending friend). You must check what kind of pending friend an
object is by their attributes ``incoming`` or ``outgoing``.
""" # noqa
return list(self._pending_friends.values())
[github] @property
def pending_friend_count(self) -> int:
""":class:`int`: The amount of pending friends the bot currently has.
"""
return len(self._pending_friends)
[github] @property
def incoming_pending_friends(self) -> List[IncomingPendingFriend]:
"""List[:class:`IncomingPendingFriend`]: A list of the clients
incoming pending friends.
"""
return [pf for pf in self._pending_friends.values() if pf.incoming]
[github] @property
def incoming_pending_friend_count(self) -> int:
""":class:`int`: The amount of active incoming pending friends the bot
currently has received."""
return len(self.incoming_pending_friends)
[github] @property
def outgoing_pending_friends(self) -> List[OutgoingPendingFriend]:
"""List[:class:`OutgoingPendingFriend`]: A list of the clients
outgoing pending friends.
"""
return [pf for pf in self._pending_friends.values() if pf.outgoing]
[github] @property
def outgoing_pending_friend_count(self) -> int:
""":class:`int`: The amount of active outgoing pending friends the bot
has sent."""
return len(self.outgoing_pending_friends)
[github] @property
def blocked_users(self) -> List[BlockedUser]:
"""List[:class:`BlockedUser`]: A list of the users client has
as blocked.
"""
return list(self._blocked_users.values())
[github] @property
def blocked_user_count(self) -> int:
""":class:`int`: The amount of blocked users the bot currently has
blocked."""
return len(self._blocked_users)
[github] @property
def presences(self) -> List[Presence]:
"""List[:class:`Presence`]: A list of the last presences from
currently online friends.
"""
return list(self._presences.values())
[github] def _check_party_confirmation(self) -> None:
k = 'party_member_confirm'
val = k in self._events and len(self._events[k]) > 0
if val != self._join_confirmation:
self._join_confirmation = val
self.default_party_config.update({'join_confirmation': val})
[github] def register_methods(self) -> None:
super().register_methods()
self._check_party_confirmation()
[github] async def internal_auth_refresh_handler(self):
try:
log.debug('Refreshing xmpp session')
await self.xmpp.close()
await self.xmpp.run()
await self._reconnect_to_party()
except AttributeError:
pass
[github] async def _start(self, dispatch_ready: bool = True) -> None:
if self._first_start:
self.add_event_handler(
'internal_auth_refresh',
self.internal_auth_refresh_handler
)
return await super()._start(dispatch_ready=dispatch_ready)
[github] async def _login(self, priority: int = 0) -> None:
res = await super()._login(priority=priority)
if res is not None:
return res
await self.refresh_caches(priority=priority)
log.debug('Successfully set up caches')
await self.xmpp.run()
log.debug('Connected to XMPP')
await self.initialize_party(priority=priority)
log.debug('Party created')
[github] def _clear_caches(self) -> None:
super()._clear_caches()
self._friends.clear()
self._pending_friends.clear()
self._blocked_users.clear()
self._presences.clear()
[github] async def _close(self, *,
close_http: bool = True,
dispatch_close: bool = True,
priority: int = 0) -> None:
self._closing = True
if self.leave_party_at_shutdown:
try:
if self.party is not None:
await self.party._leave(priority=priority)
except Exception:
pass
try:
await self.xmpp.close()
except Exception:
pass
await super()._close(
close_http=close_http,
dispatch_close=dispatch_close,
priority=priority,
)
[github] def recover_events(self) -> asyncio.Task:
return asyncio.create_task(self._recover_events())
[github] async def _recover_events(self, *,
refresh_caches: bool = False,
wait_for_close: bool = True) -> None:
if wait_for_close:
await self.wait_for('xmpp_session_close')
pre_friends = self.friends
pre_pending = self.pending_friends
await self.wait_for('xmpp_session_establish')
if refresh_caches:
await self.refresh_caches()
for friend in pre_friends:
if friend not in self._friends.values():
self.dispatch_event('friend_remove', friend)
added_friends = []
for friend in self._friends.values():
if friend not in pre_friends:
added_friends.append(friend)
self.dispatch_event('friend_add', friend)
for pending in pre_pending:
if (pending not in self._pending_friends.values()
and pending not in added_friends):
self.dispatch_event('friend_request_abort', pending)
for pending in self._pending_friends.values():
if pending not in pre_pending:
self.dispatch_event('friend_request', pending)
[github] def construct_party(self, data: dict, *,
cls: Optional[ClientParty] = None) -> ClientParty:
clazz = cls or self.default_party_config.cls
return clazz(self, data)
[github] async def initialize_party(self, priority: int = 0) -> None:
data = await self.http.party_lookup_user(
self.user.id,
priority=priority
)
if len(data['current']) > 0:
if not self.leave_party_at_shutdown:
current = data['current'][0]
member_d = None
for member_data in current['members']:
if member_data['account_id'] == self.auth.account_id:
member_d = member_data
break
if member_d is not None:
newest_conn = max(
member_data['connections'],
key=lambda o: from_iso(o['connected_at']),
)
try:
disc_at = from_iso(newest_conn['disconnected_at'])
except KeyError:
pass
else:
now = datetime.datetime.utcnow()
total_seconds = (now - disc_at).total_seconds()
if total_seconds < newest_conn.get('offline_ttl', 30):
return await self._reconnect_to_party(data=data)
party = self.construct_party(data['current'][0])
await party._leave(priority=priority)
log.debug('Left old party')
await self._create_party(priority=priority)
[github] async def refresh_caches(self, priority: int = 0) -> None:
self._friends.clear()
self._pending_friends.clear()
self._blocked_users.clear()
tasks = (
self.http.friends_get_all(
include_pending=True,
priority=priority
),
self.http.friends_get_summary(priority=priority),
self.http.presence_get_last_online(priority=priority),
)
raw_friends, raw_summary, raw_presences = await asyncio.gather(*tasks)
ids = [r['accountId'] for r in raw_friends + raw_summary['blocklist']]
chunks = (ids[i:i + 100] for i in range(0, len(ids), 100))
users = {}
tasks = [
self.http.account_get_multiple_by_user_id(
chunk,
priority=priority
)
for chunk in chunks
]
if tasks:
done = await asyncio.gather(*tasks)
else:
done = []
for results in done:
for user in results:
users[user['id']] = user
# TODO: Add method for fetching friends and other stuff
for friend in raw_friends:
try:
data = users[friend['accountId']]
except KeyError:
continue
if friend['status'] == 'ACCEPTED':
self.store_friend({**friend, **data})
elif friend['status'] == 'PENDING':
if friend['direction'] == 'INBOUND':
self.store_incoming_pending_friend({**friend, **data})
else:
self.store_outgoing_pending_friend({**friend, **data})
for data in raw_summary['friends']:
friend = self.get_friend(data['accountId'])
if friend is not None:
friend._update_summary(data)
for user_id, data in raw_presences.items():
friend = self.get_friend(user_id)
if friend is not None:
try:
value = data[0]['last_online']
except (IndexError, KeyError):
value = None
friend._update_last_logout(
from_iso(value) if value is not None else None
)
for data in raw_summary['blocklist']:
user = users.get(data['accountId'])
if user is not None:
self.store_blocked_user(user)
[github] def store_user(self, data: dict, *, try_cache: bool = True) -> User:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._users[user_id]
except KeyError:
pass
user = User(self, data)
if self.cache_users:
self._users[user.id] = user
return user
[github][docs] def get_user(self, user_id: str) -> Optional[User]:
user = super().get_user(user_id)
if user is None:
friend = self.get_friend(user_id)
if friend is not None:
user = User(self, friend.get_raw())
if self.cache_users:
self._users[user.id] = user
return user
[github] def store_friend(self, data: dict, *,
summary: Optional[dict] = None,
try_cache: bool = True) -> Friend:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._friends[user_id]
except KeyError:
pass
friend = Friend(self, data)
if summary is not None:
friend._update_summary(summary)
self._friends[friend.id] = friend
return friend
[github][docs] def get_friend(self, user_id: str) -> Optional[Friend]:
"""Tries to get a friend from the friend cache by the given user id.
Parameters
----------
user_id: :class:`str`
The id of the friend.
Returns
-------
Optional[:class:`Friend`]
The friend if found, else ``None``
"""
return self._friends.get(user_id)
[github] def store_incoming_pending_friend(self, data: dict, *,
try_cache: bool = True
) -> IncomingPendingFriend:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._pending_friends[user_id]
except KeyError:
pass
pending_friend = IncomingPendingFriend(self, data)
self._pending_friends[pending_friend.id] = pending_friend
return pending_friend
[github] def store_outgoing_pending_friend(self, data: dict, *,
try_cache: bool = True
) -> OutgoingPendingFriend:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._pending_friends[user_id]
except KeyError:
pass
pending_friend = OutgoingPendingFriend(self, data)
self._pending_friends[pending_friend.id] = pending_friend
return pending_friend
[github][docs] def get_pending_friend(self,
user_id: str
) -> Optional[Union[IncomingPendingFriend,
OutgoingPendingFriend]]:
"""Tries to get a pending friend from the pending friend cache by the
given user id.
Parameters
----------
user_id: :class:`str`
The id of the pending friend.
Returns
-------
Optional[Union[:class:`IncomingPendingFriend`,
:class:`OutgoingPendingFriend`]]
The pending friend if found, else ``None``
""" # noqa
return self._pending_friends.get(user_id)
[github][docs] def get_incoming_pending_friend(self,
user_id: str
) -> Optional[IncomingPendingFriend]:
"""Tries to get an incoming pending friend from the pending friends
cache by the given user id.
Parameters
----------
user_id: :class:`str`
The id of the incoming pending friend.
Returns
-------
Optional[:class:`IncomingPendingFriend`]
The incoming pending friend if found, else ``None``.
"""
pending_friend = self.get_pending_friend(user_id)
if pending_friend and pending_friend.incoming:
return pending_friend
[github][docs] def get_outgoing_pending_friend(self,
user_id: str
) -> Optional[OutgoingPendingFriend]:
"""Tries to get an outgoing pending friend from the pending friends
cache by the given user id.
Parameters
----------
user_id: :class:`str`
The id of the outgoing pending friend.
Returns
-------
Optional[:class:`OutgoingPendingFriend`]
The outgoing pending friend if found, else ``None``.
"""
pending_friend = self.get_pending_friend(user_id)
if pending_friend and pending_friend.outgoing:
return pending_friend
[github] def store_blocked_user(self, data: dict, *,
try_cache: bool = True) -> BlockedUser:
try:
user_id = data.get(
'accountId',
data.get('id', data.get('account_id'))
)
if try_cache:
return self._blocked_users[user_id]
except KeyError:
pass
blocked_user = BlockedUser(self, data)
self._blocked_users[blocked_user.id] = blocked_user
return blocked_user
[github][docs] def get_blocked_user(self, user_id: str) -> Optional[BlockedUser]:
"""Tries to get a blocked user from the blocked users cache by the
given user id.
Parameters
----------
user_id: :class:`str`
The id of the blocked user.
Returns
-------
Optional[:class:`BlockedUser`]
The blocked user if found, else ``None``
"""
return self._blocked_users.get(user_id)
[github][docs] def get_presence(self, user_id: str) -> Optional[Presence]:
"""Tries to get the latest received presence from the presence cache.
Parameters
----------
user_id: :class:`str`
The id of the friend you want the last presence of.
Returns
-------
Optional[:class:`Presence`]
The presence if found, else ``None``
"""
return self._presences.get(user_id)
[github][docs] def has_friend(self, user_id: str) -> bool:
"""Checks if the client is friends with the given user id.
Parameters
----------
user_id: :class:`str`
The id of the user you want to check.
Returns
-------
:class:`bool`
``True`` if user is friends with the client else ``False``
"""
return self.get_friend(user_id) is not None
[github][docs] def is_pending(self, user_id: str) -> bool:
"""Checks if the given user id is a pending friend of the client.
Parameters
----------
user_id: :class:`str`
The id of the user you want to check.
Returns
-------
:class:`bool`
``True`` if user is a pending friend else ``False``
"""
return self.get_pending_friend(user_id) is not None
[github][docs] def is_blocked(self, user_id: str) -> bool:
"""Checks if the given user id is blocked by the client.
Parameters
----------
user_id: :class:`str`
The id of the user you want to check.
Returns
-------
:class:`bool`
``True`` if user is blocked else ``False``
"""
return self.get_blocked_user(user_id) is not None
[github][docs] async def accept_friend(self, user_id: str) -> Friend:
"""|coro|
.. warning::
Do not use this method to send a friend request. It will then not
return until the friend request has been accepted by the user.
Accepts a request.
Parameters
----------
user_id: :class:`str`
The id of the user you want to accept.
Raises
------
NotFound
The specified user does not exist.
DuplicateFriendship
The client is already friends with this user.
FriendshipRequestAlreadySent
The client has already sent a friendship request that has not been
handled yet by the user.
Forbidden
The client is not allowed to send friendship requests to the user
because of the users settings.
HTTPException
An error occured while requesting to accept this friend.
Returns
-------
:class:`Friend`
Object of the friend you just added.
"""
await super().add_friend(user_id)
friend = await self.wait_for('friend_add',
check=lambda f: f.id == user_id)
return friend
[github] async def _reconnect_to_party(self, data: Optional[dict] = None) -> None:
if data is None:
data = await self.http.party_lookup_user(
self.user.id
)
if data['current']:
party_data = data['current'][0]
async with self._join_party_lock:
try:
await self._join_party(
party_data,
event='party_member_reconnect'
)
except Exception:
await self._create_party(acquire=False)
raise
else:
await self._create_party()
[github] async def _create_party(self,
config: Optional[dict] = None,
acquire: bool = True,
priority: int = 0) -> ClientParty:
aquiring = not self.auth._refresh_lock.locked() and acquire
try:
if aquiring:
await self._join_party_lock.acquire()
if isinstance(config, dict):
cf = {**self.default_party_config.config, **config}
else:
cf = self.default_party_config.config
while True:
try:
data = await self.http.party_create(
cf,
priority=priority
)
break
except HTTPException as exc:
if exc.message_code != ('errors.com.epicgames.social.'
'party.user_has_party'):
raise
data = await self.http.party_lookup_user(
self.user.id,
priority=priority
)
try:
await self.http.party_leave(
data['current'][0]['id'],
priority=priority
)
except HTTPException as e:
m = ('errors.com.epicgames.social.'
'party.party_not_found')
if e.message_code != m:
raise
await self.xmpp.leave_muc()
config = {**cf, **data['config']}
party = self.construct_party(data)
await party._update_members(
members=data['members'],
priority=priority
)
self.party = party
tasks = [
self.loop.create_task(party.join_chat()),
]
await party.meta.meta_ready_event.wait()
updated, deleted, cfg1 = party.meta.set_privacy(config['privacy'])
edit_updated, edit_deleted, cfg2 = await party._edit(
*party._default_config.meta
)
# Filter out urn:epic:* properties that was set in party create
# payload.
default_schema = {
k: v for k, v in party.meta.schema.items()
if k.startswith('Default:')
}
tasks.append(party.patch(
updated={
**default_schema,
**updated,
**edit_updated,
**party._construct_raw_squad_assignments(),
**party.meta.set_voicechat_implementation('EOSVoiceChat')
},
deleted=[*deleted, *edit_deleted],
priority=priority,
config={**cfg1, **cfg2},
))
await asyncio.gather(*tasks)
return party
finally:
if aquiring:
self._join_party_lock.release()
[github] def is_creating_party(self) -> bool:
return self._join_party_lock.locked()
[github] async def wait_until_party_ready(self) -> None:
await self._join_party_lock.wait()
[github] async def _join_party(self, party_data: dict, *,
event: str = 'party_member_join') -> ClientParty:
async with self._internal_join_party_lock:
party = self.construct_party(party_data)
await party._update_members(party_data['members'])
self.party = party
[github] def check(m):
if m.id != self.user.id:
return False
if party.id != m.party.id:
return False
return True
future = asyncio.ensure_future(
self.wait_for(event, check=check, timeout=5),
)
try:
await self.http.party_join_request(party.id)
except HTTPException as e:
if not future.cancelled():
future.cancel()
m = 'errors.com.epicgames.social.party.party_join_forbidden' # noqa
if e.message_code == m:
raise Forbidden(
'You are not allowed to join this party.'
)
raise
party_data = await self.http.party_lookup(party.id)
party = self.construct_party(party_data)
self.party = party
asyncio.ensure_future(party.join_chat())
await party._update_members(party_data['members'])
try:
await future
except asyncio.TimeoutError:
raise asyncio.TimeoutError('Party join timed out.')
return party
[github][docs] async def join_party(self, party_id: str) -> ClientParty:
"""|coro|
Joins a party by the party id.
Parameters
----------
party_id: :class:`str`
The id of the party you wish to join.
Raises
------
.. warning::
Because the client has to leave its current party before joining
a new one, a new party is created if some of these errors are
raised. Most of the time though this is not the case and the client
will remain in its current party.
PartyError
You are already a member of this party.
NotFound
The party was not found.
PartyIsFull
The party you attempted to join is full.
Forbidden
You are not allowed to join this party because it's private
and you have not been a part of it before.
.. note::
If you have been a part of the party before but got
kicked, you are ineligible to join this party and this
error is raised.
HTTPException
An error occurred when requesting to join the party.
Returns
-------
:class:`ClientParty`
The party that was just joined.
"""
async with self._join_party_lock:
if party_id == self.party.id:
raise PartyError('You are already a member of this party.')
try:
party_data = await self.http.party_lookup(party_id)
except HTTPException as e:
m = 'errors.com.epicgames.social.party.party_not_found'
if e.message_code == m:
raise NotFound(
'Could not find a party with the id {0}'.format(
party_id
)
)
m = 'errors.com.epicgames.social.party.party_query_forbidden' # noqa
if e.message_code == m:
raise Forbidden(
'You are not allowed to join this party.'
)
m = 'errors.com.epicgames.social.party.party_is_full'
if e.message_code == m:
raise PartyIsFull(
'The party you attempted to join is full.'
)
raise
try:
await self.party._leave()
party = await self._join_party(party_data)
return party
except Exception:
await self._create_party(acquire=False)
raise
[github][docs] def set_presence(self, status: str, *,
away: AwayStatus = AwayStatus.ONLINE) -> None:
"""|coro|
Sends and sets the status. This status message will override all other
presence statuses including party presence status.
Parameters
----------
status: :class:`str`
The status you want to set.
away: :class:`AwayStatus`
The away status to use. Defaults to :attr:`AwayStatus.ONLINE`.
Raises
------
TypeError
The status you tried to set were not a str.
"""
if not isinstance(status, str):
raise TypeError('status must be a str')
self.status = status
self.away = away
self.party.update_presence()
[github][docs] async def send_presence(self, status: Union[str, dict], *,
away: AwayStatus = AwayStatus.ONLINE,
to: Optional[JID] = None) -> None:
"""|coro|
Sends this status to all or one single friend.
Parameters
----------
status: Union[:class:`str`, :class:`dict`]
The status message in :class:`str` or full status in :class:`dict`.
away: :class:`AwayStatus`
The away status to use. Defaults to :attr:`AwayStatus.ONLINE`.
to: Optional[:class:`aioxmpp.JID`]
The JID of the user that should receive this status.
*Defaults to None which means it will send to all friends.*
Raises
------
TypeError
Status was an invalid type.
"""
await self.xmpp.send_presence(
status=status,
show=away.value,
to=to
)