Skip to content
4 changes: 2 additions & 2 deletions appliance/postgresql/cmd/flynn-postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func main() {
BinDir: "/usr/lib/postgresql/11/bin/",
Password: password,
Logger: log.New("component", "postgres"),
TimescaleDB: true,
ExtWhitelist: true,
TimescaleDB: false,
ExtWhitelist: false,
WaitUpstream: true,
SHMType: "posix",
})
Expand Down
65 changes: 60 additions & 5 deletions appliance/postgresql/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,13 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) {
return err
}

if err := p.writeConfig(configData{ReadOnly: downstream != nil}); err != nil {
// Start read-write during initial primary setup. The database was
// just created with initdb so there is no user data to protect. We
// need read-write access to create the superuser and install
// extensions. Once the sync standby catches up, waitForSync will
// rewrite the config with the sync standby name (which also drops
// ReadOnly, making default_transaction_read_only = off permanent).
if err := p.writeConfig(configData{}); err != nil {
log.Error("error writing postgres.conf", "path", p.configPath(), "err", err)
return err
}
Expand All @@ -399,10 +405,6 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) {
log.Error("error acquiring connection", "err", err)
return err
}
if _, err := tx.Exec("SET TRANSACTION READ WRITE"); err != nil {
log.Error("error setting transaction read-write", "err", err)
return err
}
if _, err := tx.Exec(fmt.Sprintf(`
DO
$body$
Expand All @@ -424,13 +426,66 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) {
return err
}

// Pre-install commonly needed extensions in template1 so they are
// inherited by all databases created via CREATE DATABASE. This is
// required because application database users are not superusers
// and cannot run CREATE EXTENSION themselves (pgextwlist is not
// available in the current packages layer).
if extErr := p.installExtensionsInTemplate(); extErr != nil {
log.Error("error installing extensions in template1", "err", extErr)
return extErr
}

if downstream != nil {
// Now that setup is complete, switch to read-only mode to
// prevent user writes before the sync standby catches up.
// waitForSync will rewrite the config with the sync name
// (which drops ReadOnly) once replication is caught up.
if err := p.writeConfig(configData{ReadOnly: true}); err != nil {
log.Error("error writing read-only config", "err", err)
return err
}
if err := p.sighup(); err != nil {
log.Error("error reloading config for read-only mode", "err", err)
return err
}
p.waitForSync(downstream, true)
}

return nil
}

// installExtensionsInTemplate pre-installs commonly needed PostgreSQL
// extensions in template1 so they are inherited by all databases
// created via CREATE DATABASE.
func (p *Process) installExtensionsInTemplate() error {
port, _ := strconv.Atoi(p.port)
templateDB, err := pgx.Connect(pgx.ConnConfig{
Host: "127.0.0.1",
User: "postgres",
Port: uint16(port),
Database: "template1",
})
if err != nil {
return fmt.Errorf("connecting to template1: %s", err)
}
defer templateDB.Close()

// When the primary has a downstream sync replica, postgresql.conf sets
// default_transaction_read_only = on. We need to override that for
// this session so that CREATE EXTENSION can perform writes.
if _, err := templateDB.Exec("SET default_transaction_read_only = off"); err != nil {
return fmt.Errorf("setting read-write mode on template1 connection: %s", err)
}

for _, ext := range []string{"uuid-ossp", "pgcrypto"} {
if _, err := templateDB.Exec(fmt.Sprintf(`CREATE EXTENSION IF NOT EXISTS "%s"`, ext)); err != nil {
return fmt.Errorf("creating extension %s in template1: %s", ext, err)
}
}
return nil
}

func (p *Process) assumeStandby(upstream, downstream *discoverd.Instance) error {
log := p.log.New("fn", "assumeStandby", "upstream", upstream.Addr)
log.Info("starting up as standby")
Expand Down
4 changes: 2 additions & 2 deletions bootstrap/sirenia_wait_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (a *SireniaWaitAction) Run(s *State) error {
var leader *discoverd.Instance
err = attempt.Strategy{
Min: 5,
Total: 5 * time.Minute,
Total: 10 * time.Minute,
Delay: 500 * time.Millisecond,
}.Run(func() error {
leader, err = d.Service(a.Service).Leader()
Expand All @@ -37,5 +37,5 @@ func (a *SireniaWaitAction) Run(s *State) error {
}

// connect using sirenia client and wait until database reports read/write
return client.NewClient(leader.Addr).WaitForReadWrite(5 * time.Minute)
return client.NewClient(leader.Addr).WaitForReadWrite(10 * time.Minute)
}
14 changes: 10 additions & 4 deletions bootstrap/wait_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,25 @@ import (
)

type WaitAction struct {
URL string `json:"url"`
Host string `json:"host"`
Status int `json:"status"`
URL string `json:"url"`
Host string `json:"host"`
Status int `json:"status"`
Timeout int `json:"timeout"` // seconds, 0 = use default (10 minutes)
}

func init() {
Register("wait", &WaitAction{})
}

func (a *WaitAction) Run(s *State) error {
const waitMax = 5 * time.Minute
const defaultWaitMax = 10 * time.Minute
const waitInterval = 500 * time.Millisecond

waitMax := defaultWaitMax
if a.Timeout > 0 {
waitMax = time.Duration(a.Timeout) * time.Second
}

if a.Status == 0 {
a.Status = 200
}
Expand Down
2 changes: 1 addition & 1 deletion cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ See 'flynn help <command>' for more information on a specific command.
return
}

fmt.Fprintln(os.Stderr, "WARNING: Flynn is unmaintained and new installs will fail on June 1. See: https://github.com/flynn/flynn")
// Original unmaintained warning removed — this project is actively maintained.
}

type command struct {
Expand Down
56 changes: 54 additions & 2 deletions flannel/backend/vxlan/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,44 @@ func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) {
return nil, err
}

// Generate a unique MAC address derived from the VTEP IP to ensure
// each node's flannel.1 device has a distinct hardware address.
// Without this, nodes cloned from the same image get identical MACs
// which breaks VXLAN forwarding.
//
// IMPORTANT: Only set the MAC if it differs from the current one.
// Setting the MAC (even to the same value) causes the kernel to
// flush all ARP neighbor entries on the interface, which breaks
// overlay connectivity when flannel instances crash-loop and
// repeatedly reuse the existing device.
if ip4 := devAttrs.vtepAddr.To4(); ip4 != nil {
mac := net.HardwareAddr{0x02, 0x42, ip4[0], ip4[1], ip4[2], ip4[3]}
currentMAC := link.HardwareAddr
if !macEqual(currentMAC, mac) {
if err := netlink.LinkSetHardwareAddr(link, mac); err != nil {
// If setting MAC while UP fails, try bringing the link down first
log.Warningf("failed to set MAC on %s while UP: %v; retrying with link down", link.Name, err)
if err2 := netlink.LinkSetDown(link); err2 != nil {
log.Warningf("failed to bring %s down: %v", link.Name, err2)
}
if err2 := netlink.LinkSetHardwareAddr(link, mac); err2 != nil {
log.Errorf("failed to set unique MAC on %s even after link down: %v", link.Name, err2)
} else {
link.HardwareAddr = mac
log.Infof("set VXLAN device %s MAC to %s (after link down, derived from VTEP IP %s)", link.Name, mac, devAttrs.vtepAddr)
}
if err2 := netlink.LinkSetUp(link); err2 != nil {
log.Warningf("failed to bring %s back up: %v", link.Name, err2)
}
} else {
link.HardwareAddr = mac
log.Infof("set VXLAN device %s MAC to %s (derived from VTEP IP %s)", link.Name, mac, devAttrs.vtepAddr)
}
} else {
log.Infof("VXLAN device %s already has correct MAC %s, skipping set", link.Name, mac)
}
}

return &vxlanDevice{
link: link,
}, nil
Expand All @@ -56,6 +94,7 @@ func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) {

incompat := vxlanLinksIncompat(vxlan, existing)
if incompat == "" {
log.Infof("reusing existing %q device", vxlan.Name)
return existing.(*netlink.Vxlan), nil
}

Expand Down Expand Up @@ -113,6 +152,19 @@ func (dev *vxlanDevice) Destroy() {
netlink.LinkDel(dev.link)
}

// macEqual compares two hardware addresses for equality.
func macEqual(a, b net.HardwareAddr) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

func (dev *vxlanDevice) MACAddr() net.HardwareAddr {
return dev.link.HardwareAddr
}
Expand All @@ -127,8 +179,8 @@ type neigh struct {
}

func (dev *vxlanDevice) AddL2(n neigh) error {
log.Infof("calling NeighAdd: %v, %v", n.IP, n.MAC)
return netlink.NeighAdd(&netlink.Neigh{
log.Infof("calling NeighSet (L2/FDB): %v, %v", n.IP, n.MAC)
return netlink.NeighSet(&netlink.Neigh{
LinkIndex: dev.link.Index,
State: netlink.NUD_PERMANENT,
Family: syscall.AF_BRIDGE,
Expand Down
Loading
Loading