mas_storage_pg/
app_session.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{CompatSession, CompatSessionState, Device, Session, SessionState, User};
11use mas_storage::{
12    Clock, Page, Pagination,
13    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
14    compat::CompatSessionFilter,
15    oauth2::OAuth2SessionFilter,
16};
17use oauth2_types::scope::{Scope, ScopeToken};
18use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
19use sea_query::{
20    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
21};
22use sea_query_binder::SqlxBinder;
23use sqlx::PgConnection;
24use tracing::Instrument;
25use ulid::Ulid;
26use uuid::Uuid;
27
28use crate::{
29    DatabaseError, ExecuteExt,
30    errors::DatabaseInconsistencyError,
31    filter::StatementExt,
32    iden::{CompatSessions, OAuth2Sessions},
33    pagination::QueryBuilderExt,
34};
35
36/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
37pub struct PgAppSessionRepository<'c> {
38    conn: &'c mut PgConnection,
39}
40
41impl<'c> PgAppSessionRepository<'c> {
42    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
43    /// connection
44    pub fn new(conn: &'c mut PgConnection) -> Self {
45        Self { conn }
46    }
47}
48
49mod priv_ {
50    // The enum_def macro generates a public enum, which we don't want, because it
51    // triggers the missing docs warning
52
53    use std::net::IpAddr;
54
55    use chrono::{DateTime, Utc};
56    use sea_query::enum_def;
57    use uuid::Uuid;
58
59    #[derive(sqlx::FromRow)]
60    #[enum_def]
61    pub(super) struct AppSessionLookup {
62        pub(super) cursor: Uuid,
63        pub(super) compat_session_id: Option<Uuid>,
64        pub(super) oauth2_session_id: Option<Uuid>,
65        pub(super) oauth2_client_id: Option<Uuid>,
66        pub(super) user_session_id: Option<Uuid>,
67        pub(super) user_id: Option<Uuid>,
68        pub(super) scope_list: Option<Vec<String>>,
69        pub(super) device_id: Option<String>,
70        pub(super) human_name: Option<String>,
71        pub(super) created_at: DateTime<Utc>,
72        pub(super) finished_at: Option<DateTime<Utc>>,
73        pub(super) is_synapse_admin: Option<bool>,
74        pub(super) user_agent: Option<String>,
75        pub(super) last_active_at: Option<DateTime<Utc>>,
76        pub(super) last_active_ip: Option<IpAddr>,
77    }
78}
79
80use priv_::{AppSessionLookup, AppSessionLookupIden};
81
82impl TryFrom<AppSessionLookup> for AppSession {
83    type Error = DatabaseError;
84
85    #[allow(clippy::too_many_lines)]
86    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
87        // This is annoying to do, but we have to match on all the fields to determine
88        // whether it's a compat session or an oauth2 session
89        let AppSessionLookup {
90            cursor,
91            compat_session_id,
92            oauth2_session_id,
93            oauth2_client_id,
94            user_session_id,
95            user_id,
96            scope_list,
97            device_id,
98            human_name,
99            created_at,
100            finished_at,
101            is_synapse_admin,
102            user_agent,
103            last_active_at,
104            last_active_ip,
105        } = value;
106
107        let user_session_id = user_session_id.map(Ulid::from);
108
109        match (
110            compat_session_id,
111            oauth2_session_id,
112            oauth2_client_id,
113            user_id,
114            scope_list,
115            device_id,
116            is_synapse_admin,
117        ) {
118            (
119                Some(compat_session_id),
120                None,
121                None,
122                Some(user_id),
123                None,
124                device_id_opt,
125                Some(is_synapse_admin),
126            ) => {
127                let id = compat_session_id.into();
128                let device = device_id_opt
129                    .map(Device::try_from)
130                    .transpose()
131                    .map_err(|e| {
132                        DatabaseInconsistencyError::on("compat_sessions")
133                            .column("device_id")
134                            .row(id)
135                            .source(e)
136                    })?;
137
138                let state = match finished_at {
139                    None => CompatSessionState::Valid,
140                    Some(finished_at) => CompatSessionState::Finished { finished_at },
141                };
142
143                let session = CompatSession {
144                    id,
145                    state,
146                    user_id: user_id.into(),
147                    device,
148                    human_name,
149                    user_session_id,
150                    created_at,
151                    is_synapse_admin,
152                    user_agent,
153                    last_active_at,
154                    last_active_ip,
155                };
156
157                Ok(AppSession::Compat(Box::new(session)))
158            }
159
160            (
161                None,
162                Some(oauth2_session_id),
163                Some(oauth2_client_id),
164                user_id,
165                Some(scope_list),
166                None,
167                None,
168            ) => {
169                let id = oauth2_session_id.into();
170                let scope: Result<Scope, _> =
171                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
172                let scope = scope.map_err(|e| {
173                    DatabaseInconsistencyError::on("oauth2_sessions")
174                        .column("scope")
175                        .row(id)
176                        .source(e)
177                })?;
178
179                let state = match value.finished_at {
180                    None => SessionState::Valid,
181                    Some(finished_at) => SessionState::Finished { finished_at },
182                };
183
184                let session = Session {
185                    id,
186                    state,
187                    created_at,
188                    client_id: oauth2_client_id.into(),
189                    user_id: user_id.map(Ulid::from),
190                    user_session_id,
191                    scope,
192                    user_agent,
193                    last_active_at,
194                    last_active_ip,
195                };
196
197                Ok(AppSession::OAuth2(Box::new(session)))
198            }
199
200            _ => Err(DatabaseInconsistencyError::on("sessions")
201                .row(cursor.into())
202                .into()),
203        }
204    }
205}
206
207/// Split a [`AppSessionFilter`] into two separate filters: a
208/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
209fn split_filter(
210    filter: AppSessionFilter<'_>,
211) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
212    let mut compat_filter = CompatSessionFilter::new();
213    let mut oauth2_filter = OAuth2SessionFilter::new();
214
215    if let Some(user) = filter.user() {
216        compat_filter = compat_filter.for_user(user);
217        oauth2_filter = oauth2_filter.for_user(user);
218    }
219
220    match filter.state() {
221        Some(AppSessionState::Active) => {
222            compat_filter = compat_filter.active_only();
223            oauth2_filter = oauth2_filter.active_only();
224        }
225        Some(AppSessionState::Finished) => {
226            compat_filter = compat_filter.finished_only();
227            oauth2_filter = oauth2_filter.finished_only();
228        }
229        None => {}
230    }
231
232    if let Some(device) = filter.device() {
233        compat_filter = compat_filter.for_device(device);
234        oauth2_filter = oauth2_filter.for_device(device);
235    }
236
237    if let Some(browser_session) = filter.browser_session() {
238        compat_filter = compat_filter.for_browser_session(browser_session);
239        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
240    }
241
242    if let Some(last_active_before) = filter.last_active_before() {
243        compat_filter = compat_filter.with_last_active_before(last_active_before);
244        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
245    }
246
247    if let Some(last_active_after) = filter.last_active_after() {
248        compat_filter = compat_filter.with_last_active_after(last_active_after);
249        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
250    }
251
252    (compat_filter, oauth2_filter)
253}
254
255#[async_trait]
256impl AppSessionRepository for PgAppSessionRepository<'_> {
257    type Error = DatabaseError;
258
259    #[allow(clippy::too_many_lines)]
260    #[tracing::instrument(
261        name = "db.app_session.list",
262        fields(
263            db.query.text,
264        ),
265        skip_all,
266        err,
267    )]
268    async fn list(
269        &mut self,
270        filter: AppSessionFilter<'_>,
271        pagination: Pagination,
272    ) -> Result<Page<AppSession>, Self::Error> {
273        let (compat_filter, oauth2_filter) = split_filter(filter);
274
275        let mut oauth2_session_select = Query::select()
276            .expr_as(
277                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
278                AppSessionLookupIden::Cursor,
279            )
280            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
281            .expr_as(
282                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
283                AppSessionLookupIden::Oauth2SessionId,
284            )
285            .expr_as(
286                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
287                AppSessionLookupIden::Oauth2ClientId,
288            )
289            .expr_as(
290                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
291                AppSessionLookupIden::UserSessionId,
292            )
293            .expr_as(
294                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
295                AppSessionLookupIden::UserId,
296            )
297            .expr_as(
298                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
299                AppSessionLookupIden::ScopeList,
300            )
301            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
302            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::HumanName)
303            .expr_as(
304                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
305                AppSessionLookupIden::CreatedAt,
306            )
307            .expr_as(
308                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
309                AppSessionLookupIden::FinishedAt,
310            )
311            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
312            .expr_as(
313                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
314                AppSessionLookupIden::UserAgent,
315            )
316            .expr_as(
317                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
318                AppSessionLookupIden::LastActiveAt,
319            )
320            .expr_as(
321                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
322                AppSessionLookupIden::LastActiveIp,
323            )
324            .from(OAuth2Sessions::Table)
325            .apply_filter(oauth2_filter)
326            .clone();
327
328        let compat_session_select = Query::select()
329            .expr_as(
330                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
331                AppSessionLookupIden::Cursor,
332            )
333            .expr_as(
334                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
335                AppSessionLookupIden::CompatSessionId,
336            )
337            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
338            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
339            .expr_as(
340                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
341                AppSessionLookupIden::UserSessionId,
342            )
343            .expr_as(
344                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
345                AppSessionLookupIden::UserId,
346            )
347            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
348            .expr_as(
349                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
350                AppSessionLookupIden::DeviceId,
351            )
352            .expr_as(
353                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
354                AppSessionLookupIden::HumanName,
355            )
356            .expr_as(
357                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
358                AppSessionLookupIden::CreatedAt,
359            )
360            .expr_as(
361                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
362                AppSessionLookupIden::FinishedAt,
363            )
364            .expr_as(
365                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
366                AppSessionLookupIden::IsSynapseAdmin,
367            )
368            .expr_as(
369                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
370                AppSessionLookupIden::UserAgent,
371            )
372            .expr_as(
373                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
374                AppSessionLookupIden::LastActiveAt,
375            )
376            .expr_as(
377                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
378                AppSessionLookupIden::LastActiveIp,
379            )
380            .from(CompatSessions::Table)
381            .apply_filter(compat_filter)
382            .clone();
383
384        let common_table_expression = CommonTableExpression::new()
385            .query(
386                oauth2_session_select
387                    .union(UnionType::All, compat_session_select)
388                    .clone(),
389            )
390            .table_name(Alias::new("sessions"))
391            .clone();
392
393        let with_clause = Query::with().cte(common_table_expression).clone();
394
395        let select = Query::select()
396            .column(ColumnRef::Asterisk)
397            .from(Alias::new("sessions"))
398            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
399            .clone();
400
401        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
402
403        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
404            .traced()
405            .fetch_all(&mut *self.conn)
406            .await?;
407
408        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
409
410        Ok(page)
411    }
412
413    #[tracing::instrument(
414        name = "db.app_session.count",
415        fields(
416            db.query.text,
417        ),
418        skip_all,
419        err,
420    )]
421    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
422        let (compat_filter, oauth2_filter) = split_filter(filter);
423        let mut oauth2_session_select = Query::select()
424            .expr(Expr::cust("1"))
425            .from(OAuth2Sessions::Table)
426            .apply_filter(oauth2_filter)
427            .clone();
428
429        let compat_session_select = Query::select()
430            .expr(Expr::cust("1"))
431            .from(CompatSessions::Table)
432            .apply_filter(compat_filter)
433            .clone();
434
435        let common_table_expression = CommonTableExpression::new()
436            .query(
437                oauth2_session_select
438                    .union(UnionType::All, compat_session_select)
439                    .clone(),
440            )
441            .table_name(Alias::new("sessions"))
442            .clone();
443
444        let with_clause = Query::with().cte(common_table_expression).clone();
445
446        let select = Query::select()
447            .expr(Expr::cust("COUNT(*)"))
448            .from(Alias::new("sessions"))
449            .clone();
450
451        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
452
453        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
454            .traced()
455            .fetch_one(&mut *self.conn)
456            .await?;
457
458        count
459            .try_into()
460            .map_err(DatabaseError::to_invalid_operation)
461    }
462
463    #[tracing::instrument(
464        name = "db.app_session.finish_sessions_to_replace_device",
465        fields(
466            db.query.text,
467            %user.id,
468            %device_id = device.as_str()
469        ),
470        skip_all,
471        err,
472    )]
473    async fn finish_sessions_to_replace_device(
474        &mut self,
475        clock: &dyn Clock,
476        user: &User,
477        device: &Device,
478    ) -> Result<(), Self::Error> {
479        // TODO need to invoke this from all the oauth2 login sites
480        let span = tracing::info_span!(
481            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
482            { DB_QUERY_TEXT } = tracing::field::Empty,
483        );
484        let finished_at = clock.now();
485        sqlx::query!(
486            "
487                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
488            ",
489            Uuid::from(user.id),
490            device.as_str(),
491            finished_at
492        )
493        .record(&span)
494        .execute(&mut *self.conn)
495        .instrument(span)
496        .await?;
497
498        if let Ok(device_as_scope_token) = device.to_scope_token() {
499            let span = tracing::info_span!(
500                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
501                { DB_QUERY_TEXT } = tracing::field::Empty,
502            );
503            sqlx::query!(
504                "
505                    UPDATE oauth2_sessions SET finished_at = $3 WHERE user_id = $1 AND $2 = ANY(scope_list) AND finished_at IS NULL
506                ",
507                Uuid::from(user.id),
508                device_as_scope_token.as_str(),
509                finished_at
510            )
511            .record(&span)
512            .execute(&mut *self.conn)
513            .instrument(span)
514            .await?;
515        }
516
517        Ok(())
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use chrono::Duration;
524    use mas_data_model::Device;
525    use mas_storage::{
526        Pagination, RepositoryAccess,
527        app_session::{AppSession, AppSessionFilter},
528        clock::MockClock,
529        oauth2::OAuth2SessionRepository,
530    };
531    use oauth2_types::{
532        requests::GrantType,
533        scope::{OPENID, Scope},
534    };
535    use rand::SeedableRng;
536    use rand_chacha::ChaChaRng;
537    use sqlx::PgPool;
538
539    use crate::PgRepository;
540
541    #[sqlx::test(migrator = "crate::MIGRATOR")]
542    async fn test_app_repo(pool: PgPool) {
543        let mut rng = ChaChaRng::seed_from_u64(42);
544        let clock = MockClock::default();
545        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
546
547        // Create a user
548        let user = repo
549            .user()
550            .add(&mut rng, &clock, "john".to_owned())
551            .await
552            .unwrap();
553
554        let all = AppSessionFilter::new().for_user(&user);
555        let active = all.active_only();
556        let finished = all.finished_only();
557        let pagination = Pagination::first(10);
558
559        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
560        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
561        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
562
563        let full_list = repo.app_session().list(all, pagination).await.unwrap();
564        assert!(full_list.edges.is_empty());
565        let active_list = repo.app_session().list(active, pagination).await.unwrap();
566        assert!(active_list.edges.is_empty());
567        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
568        assert!(finished_list.edges.is_empty());
569
570        // Start a compat session for that user
571        let device = Device::generate(&mut rng);
572        let compat_session = repo
573            .compat_session()
574            .add(&mut rng, &clock, &user, device.clone(), None, false)
575            .await
576            .unwrap();
577
578        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
579        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
580        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
581
582        let full_list = repo.app_session().list(all, pagination).await.unwrap();
583        assert_eq!(full_list.edges.len(), 1);
584        assert_eq!(
585            full_list.edges[0],
586            AppSession::Compat(Box::new(compat_session.clone()))
587        );
588        let active_list = repo.app_session().list(active, pagination).await.unwrap();
589        assert_eq!(active_list.edges.len(), 1);
590        assert_eq!(
591            active_list.edges[0],
592            AppSession::Compat(Box::new(compat_session.clone()))
593        );
594        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
595        assert!(finished_list.edges.is_empty());
596
597        // Finish the session
598        let compat_session = repo
599            .compat_session()
600            .finish(&clock, compat_session)
601            .await
602            .unwrap();
603
604        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
605        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
606        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
607
608        let full_list = repo.app_session().list(all, pagination).await.unwrap();
609        assert_eq!(full_list.edges.len(), 1);
610        assert_eq!(
611            full_list.edges[0],
612            AppSession::Compat(Box::new(compat_session.clone()))
613        );
614        let active_list = repo.app_session().list(active, pagination).await.unwrap();
615        assert!(active_list.edges.is_empty());
616        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
617        assert_eq!(finished_list.edges.len(), 1);
618        assert_eq!(
619            finished_list.edges[0],
620            AppSession::Compat(Box::new(compat_session.clone()))
621        );
622
623        // Start an OAuth2 session
624        let client = repo
625            .oauth2_client()
626            .add(
627                &mut rng,
628                &clock,
629                vec!["https://example.com/redirect".parse().unwrap()],
630                None,
631                None,
632                None,
633                vec![GrantType::AuthorizationCode],
634                Some("First client".to_owned()),
635                Some("https://example.com/logo.png".parse().unwrap()),
636                Some("https://example.com/".parse().unwrap()),
637                Some("https://example.com/policy".parse().unwrap()),
638                Some("https://example.com/tos".parse().unwrap()),
639                Some("https://example.com/jwks.json".parse().unwrap()),
640                None,
641                None,
642                None,
643                None,
644                None,
645                Some("https://example.com/login".parse().unwrap()),
646            )
647            .await
648            .unwrap();
649
650        let device2 = Device::generate(&mut rng);
651        let scope = Scope::from_iter([OPENID, device2.to_scope_token().unwrap()]);
652
653        // We're moving the clock forward by 1 minute between each session to ensure
654        // we're getting consistent ordering in lists.
655        clock.advance(Duration::try_minutes(1).unwrap());
656
657        let oauth_session = repo
658            .oauth2_session()
659            .add(&mut rng, &clock, &client, Some(&user), None, scope)
660            .await
661            .unwrap();
662
663        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
664        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
665        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
666
667        let full_list = repo.app_session().list(all, pagination).await.unwrap();
668        assert_eq!(full_list.edges.len(), 2);
669        assert_eq!(
670            full_list.edges[0],
671            AppSession::Compat(Box::new(compat_session.clone()))
672        );
673        assert_eq!(
674            full_list.edges[1],
675            AppSession::OAuth2(Box::new(oauth_session.clone()))
676        );
677
678        let active_list = repo.app_session().list(active, pagination).await.unwrap();
679        assert_eq!(active_list.edges.len(), 1);
680        assert_eq!(
681            active_list.edges[0],
682            AppSession::OAuth2(Box::new(oauth_session.clone()))
683        );
684
685        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
686        assert_eq!(finished_list.edges.len(), 1);
687        assert_eq!(
688            finished_list.edges[0],
689            AppSession::Compat(Box::new(compat_session.clone()))
690        );
691
692        // Finish the session
693        let oauth_session = repo
694            .oauth2_session()
695            .finish(&clock, oauth_session)
696            .await
697            .unwrap();
698
699        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
700        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
701        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
702
703        let full_list = repo.app_session().list(all, pagination).await.unwrap();
704        assert_eq!(full_list.edges.len(), 2);
705        assert_eq!(
706            full_list.edges[0],
707            AppSession::Compat(Box::new(compat_session.clone()))
708        );
709        assert_eq!(
710            full_list.edges[1],
711            AppSession::OAuth2(Box::new(oauth_session.clone()))
712        );
713
714        let active_list = repo.app_session().list(active, pagination).await.unwrap();
715        assert!(active_list.edges.is_empty());
716
717        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
718        assert_eq!(finished_list.edges.len(), 2);
719        assert_eq!(
720            finished_list.edges[0],
721            AppSession::Compat(Box::new(compat_session.clone()))
722        );
723        assert_eq!(
724            full_list.edges[1],
725            AppSession::OAuth2(Box::new(oauth_session.clone()))
726        );
727
728        // Query by device
729        let filter = AppSessionFilter::new().for_device(&device);
730        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
731        let list = repo.app_session().list(filter, pagination).await.unwrap();
732        assert_eq!(list.edges.len(), 1);
733        assert_eq!(
734            list.edges[0],
735            AppSession::Compat(Box::new(compat_session.clone()))
736        );
737
738        let filter = AppSessionFilter::new().for_device(&device2);
739        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
740        let list = repo.app_session().list(filter, pagination).await.unwrap();
741        assert_eq!(list.edges.len(), 1);
742        assert_eq!(
743            list.edges[0],
744            AppSession::OAuth2(Box::new(oauth_session.clone()))
745        );
746
747        // Create a second user
748        let user2 = repo
749            .user()
750            .add(&mut rng, &clock, "alice".to_owned())
751            .await
752            .unwrap();
753
754        // If we list/count for this user, we should get nothing
755        let filter = AppSessionFilter::new().for_user(&user2);
756        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
757        let list = repo.app_session().list(filter, pagination).await.unwrap();
758        assert!(list.edges.is_empty());
759    }
760}