Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,8 @@ pub enum CommentObject {
Schema,
/// A sequence.
Sequence,
/// A subscription.
Subscription,
/// A table.
Table,
/// A type.
Expand All @@ -2483,6 +2485,7 @@ impl fmt::Display for CommentObject {
CommentObject::Role => f.write_str("ROLE"),
CommentObject::Schema => f.write_str("SCHEMA"),
CommentObject::Sequence => f.write_str("SEQUENCE"),
CommentObject::Subscription => f.write_str("SUBSCRIPTION"),
CommentObject::Table => f.write_str("TABLE"),
CommentObject::Type => f.write_str("TYPE"),
CommentObject::User => f.write_str("USER"),
Expand Down Expand Up @@ -3682,6 +3685,12 @@ pub enum Statement {
/// A `CREATE SERVER` statement.
CreateServer(CreateServerStatement),
/// ```sql
/// CREATE SUBSCRIPTION
/// ```
///
/// Note: this is a PostgreSQL-specific statement.
CreateSubscription(CreateSubscription),
/// ```sql
/// CREATE POLICY
/// ```
/// See [PostgreSQL](https://www.postgresql.org/docs/current/sql-createpolicy.html)
Expand Down Expand Up @@ -3768,6 +3777,12 @@ pub enum Statement {
operation: AlterRoleOperation,
},
/// ```sql
/// ALTER SUBSCRIPTION
/// ```
///
/// Note: this is a PostgreSQL-specific statement.
AlterSubscription(AlterSubscription),
/// ```sql
/// ALTER POLICY <NAME> ON <TABLE NAME> [<OPERATION>]
/// ```
/// (Postgresql-specific)
Expand Down Expand Up @@ -5436,6 +5451,9 @@ impl fmt::Display for Statement {
Statement::CreateServer(stmt) => {
write!(f, "{stmt}")
}
Statement::CreateSubscription(stmt) => {
write!(f, "{stmt}")
}
Statement::CreatePolicy(policy) => write!(f, "{policy}"),
Statement::CreateConnector(create_connector) => create_connector.fmt(f),
Statement::CreateOperator(create_operator) => create_operator.fmt(f),
Expand Down Expand Up @@ -5475,6 +5493,9 @@ impl fmt::Display for Statement {
Statement::AlterRole { name, operation } => {
write!(f, "ALTER ROLE {name} {operation}")
}
Statement::AlterSubscription(alter_subscription) => {
write!(f, "{alter_subscription}")
}
Statement::AlterPolicy(alter_policy) => write!(f, "{alter_policy}"),
Statement::AlterConnector {
name,
Expand Down Expand Up @@ -6760,6 +6781,11 @@ pub enum Action {
BindServiceEndpoint,
/// Connect permission.
Connect,
/// Custom privilege name (primarily PostgreSQL).
Custom {
/// The custom privilege identifier.
name: Ident,
},
/// Create action, optionally specifying an object type.
Create {
/// Optional object type to create.
Expand Down Expand Up @@ -6874,6 +6900,7 @@ impl fmt::Display for Action {
Action::Audit => f.write_str("AUDIT")?,
Action::BindServiceEndpoint => f.write_str("BIND SERVICE ENDPOINT")?,
Action::Connect => f.write_str("CONNECT")?,
Action::Custom { name } => write!(f, "{name}")?,
Action::Create { obj_type } => {
f.write_str("CREATE")?;
if let Some(obj_type) = obj_type {
Expand Down Expand Up @@ -8246,6 +8273,8 @@ pub enum ObjectType {
Role,
/// A sequence.
Sequence,
/// A subscription.
Subscription,
/// A stage.
Stage,
/// A type definition.
Expand All @@ -8267,6 +8296,7 @@ impl fmt::Display for ObjectType {
ObjectType::Database => "DATABASE",
ObjectType::Role => "ROLE",
ObjectType::Sequence => "SEQUENCE",
ObjectType::Subscription => "SUBSCRIPTION",
ObjectType::Stage => "STAGE",
ObjectType::Type => "TYPE",
ObjectType::User => "USER",
Expand Down Expand Up @@ -8770,6 +8800,206 @@ impl fmt::Display for CreateServerOption {
}
}

/// A subscription option used by `CREATE/ALTER SUBSCRIPTION`.
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct SubscriptionOption {
/// Subscription parameter name.
pub name: Ident,
/// Optional parameter value.
pub value: Option<Expr>,
}

impl fmt::Display for SubscriptionOption {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(value) = &self.value {
write!(f, "{} = {}", self.name, value)
} else {
write!(f, "{}", self.name)
}
}
}

/// A `CREATE SUBSCRIPTION` statement.
///
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-createsubscription.html)
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct CreateSubscription {
/// Subscription name.
pub name: ObjectName,
/// Connection string.
pub connection: String,
/// Publication names.
pub publications: Vec<Ident>,
/// Optional subscription parameters from `WITH (...)`.
pub with_options: Vec<SubscriptionOption>,
}

impl fmt::Display for CreateSubscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CREATE SUBSCRIPTION {} CONNECTION '{}' PUBLICATION {}",
self.name,
value::escape_single_quote_string(&self.connection),
display_comma_separated(&self.publications)
)?;

if !self.with_options.is_empty() {
write!(f, " WITH ({})", display_comma_separated(&self.with_options))?;
}

Ok(())
}
}

/// An `ALTER SUBSCRIPTION` statement.
///
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-altersubscription.html)
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct AlterSubscription {
/// Subscription name.
pub name: ObjectName,
/// Operation to perform.
pub operation: AlterSubscriptionOperation,
}

impl fmt::Display for AlterSubscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ALTER SUBSCRIPTION {} {}", self.name, self.operation)
}
}

/// Operations supported by `ALTER SUBSCRIPTION`.
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum AlterSubscriptionOperation {
/// Update the subscription connection string.
Connection {
/// New connection string.
connection: String,
},
/// Replace subscription publications.
SetPublication {
/// Publication names.
publications: Vec<Ident>,
/// Optional `WITH (...)` parameters.
with_options: Vec<SubscriptionOption>,
},
/// Add publications to the subscription.
AddPublication {
/// Publication names.
publications: Vec<Ident>,
/// Optional `WITH (...)` parameters.
with_options: Vec<SubscriptionOption>,
},
/// Drop publications from the subscription.
DropPublication {
/// Publication names.
publications: Vec<Ident>,
/// Optional `WITH (...)` parameters.
with_options: Vec<SubscriptionOption>,
},
/// Refresh subscription publications.
RefreshPublication {
/// Optional `WITH (...)` parameters.
with_options: Vec<SubscriptionOption>,
},
/// Enable the subscription.
Enable,
/// Disable the subscription.
Disable,
/// Set subscription parameters.
SetOptions {
/// Parameters within `SET (...)`.
options: Vec<SubscriptionOption>,
},
/// Change subscription owner.
OwnerTo {
/// New owner.
owner: Owner,
},
/// Rename the subscription.
RenameTo {
/// New subscription name.
new_name: ObjectName,
},
}

impl fmt::Display for AlterSubscriptionOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn write_with_options(
f: &mut fmt::Formatter<'_>,
with_options: &[SubscriptionOption],
) -> fmt::Result {
if !with_options.is_empty() {
write!(f, " WITH ({})", display_comma_separated(with_options))?;
}
Ok(())
}

match self {
AlterSubscriptionOperation::Connection { connection } => {
write!(
f,
"CONNECTION '{}'",
value::escape_single_quote_string(connection)
)
}
AlterSubscriptionOperation::SetPublication {
publications,
with_options,
} => {
write!(
f,
"SET PUBLICATION {}",
display_comma_separated(publications)
)?;
write_with_options(f, with_options)
}
AlterSubscriptionOperation::AddPublication {
publications,
with_options,
} => {
write!(
f,
"ADD PUBLICATION {}",
display_comma_separated(publications)
)?;
write_with_options(f, with_options)
}
AlterSubscriptionOperation::DropPublication {
publications,
with_options,
} => {
write!(
f,
"DROP PUBLICATION {}",
display_comma_separated(publications)
)?;
write_with_options(f, with_options)
}
AlterSubscriptionOperation::RefreshPublication { with_options } => {
write!(f, "REFRESH PUBLICATION")?;
write_with_options(f, with_options)
}
AlterSubscriptionOperation::Enable => write!(f, "ENABLE"),
AlterSubscriptionOperation::Disable => write!(f, "DISABLE"),
AlterSubscriptionOperation::SetOptions { options } => {
write!(f, "SET ({})", display_comma_separated(options))
}
AlterSubscriptionOperation::OwnerTo { owner } => write!(f, "OWNER TO {owner}"),
AlterSubscriptionOperation::RenameTo { new_name } => write!(f, "RENAME TO {new_name}"),
}
}
}

#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
Expand Down Expand Up @@ -11669,6 +11899,8 @@ impl fmt::Display for VacuumStatement {
pub enum Reset {
/// Resets all session parameters to their default values.
ALL,
/// Resets session authorization to the session user.
SessionAuthorization,

/// Resets a specific session parameter to its default value.
ConfigurationParameter(ObjectName),
Expand Down Expand Up @@ -11751,6 +11983,7 @@ impl fmt::Display for ResetStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self.reset {
Reset::ALL => write!(f, "RESET ALL"),
Reset::SessionAuthorization => write!(f, "RESET SESSION AUTHORIZATION"),
Reset::ConfigurationParameter(param) => write!(f, "RESET {}", param),
}
}
Expand Down Expand Up @@ -11888,6 +12121,12 @@ impl From<CreateServerStatement> for Statement {
}
}

impl From<CreateSubscription> for Statement {
fn from(c: CreateSubscription) -> Self {
Self::CreateSubscription(c)
}
}

impl From<CreateConnector> for Statement {
fn from(c: CreateConnector) -> Self {
Self::CreateConnector(c)
Expand Down Expand Up @@ -11918,6 +12157,12 @@ impl From<AlterSchema> for Statement {
}
}

impl From<AlterSubscription> for Statement {
fn from(a: AlterSubscription) -> Self {
Self::AlterSubscription(a)
}
}

impl From<AlterType> for Statement {
fn from(a: AlterType) -> Self {
Self::AlterType(a)
Expand Down
2 changes: 2 additions & 0 deletions src/ast/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl Spanned for Statement {
Statement::DropOperatorClass(drop_operator_class) => drop_operator_class.span(),
Statement::CreateSecret { .. } => Span::empty(),
Statement::CreateServer { .. } => Span::empty(),
Statement::CreateSubscription { .. } => Span::empty(),
Statement::CreateConnector { .. } => Span::empty(),
Statement::CreateOperator(create_operator) => create_operator.span(),
Statement::CreateOperatorFamily(create_operator_family) => {
Expand All @@ -407,6 +408,7 @@ impl Spanned for Statement {
Statement::AlterOperatorFamily { .. } => Span::empty(),
Statement::AlterOperatorClass { .. } => Span::empty(),
Statement::AlterRole { .. } => Span::empty(),
Statement::AlterSubscription { .. } => Span::empty(),
Statement::AlterSession { .. } => Span::empty(),
Statement::AttachDatabase { .. } => Span::empty(),
Statement::AttachDuckDBDatabase { .. } => Span::empty(),
Expand Down
2 changes: 2 additions & 0 deletions src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ define_keywords!(
PROGRAM,
PROJECTION,
PUBLIC,
PUBLICATION,
PURCHASE,
PURGE,
QUALIFY,
Expand Down Expand Up @@ -994,6 +995,7 @@ define_keywords!(
STRUCT,
SUBMULTISET,
SUBSCRIPT,
SUBSCRIPTION,
SUBSTR,
SUBSTRING,
SUBSTRING_REGEX,
Expand Down
Loading
Loading