# STATION DATA TO CSV import logging import psycopg2 import csv import glob import sys from minio import Minio # Configure logging logging.basicConfig(filename='wis2_data_trans_multi.log', filemode='a', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # Database connection parameters dbname = 'surface_db' user = 'dba' password = 'wjxm6ZFL9igbfjkd6DKH73Nnsk' host = '192.168.3.180' port = '5432' # Station id data will be retrieved from station_id = [108] try: # Establish a connection with psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port ) as connection: # Log that the database connection was established logging.info('Database connection established') print("Database connection established") for id in station_id: # Log that data is being processed for a station logging.info(f'Processing data for Station ID: {id}') # Create a dictionary to map column names to their corresponding values data_row = { 'wsi_series': None, 'wsi_issuer': None, 'wsi_issue_number': None, 'wsi_local': None, 'wmo_block_number': None, 'wmo_station_number': None, 'station_type': None, 'year': None, 'month': None, 'day': None, 'hour': None, 'minute': None, 'latitude': None, 'longitude': None, 'station_height_above_msl': None, 'barometer_height_above_msl': None, 'station_pressure': None, 'msl_pressure': None, 'geopotential_height': None, 'thermometer_height': None, 'air_temperature': None, 'dewpoint_temperature': None, 'relative_humidity': None, 'method_of_ground_state_measurement': None, 'ground_state': None, 'method_of_snow_depth_measurement': None, 'snow_depth': None, 'precipitation_intensity': None, 'anemometer_height': None, 'time_period_of_wind': None, 'wind_direction': None, 'wind_speed': None, 'maximum_wind_gust_direction_10_minutes': None, 'maximum_wind_gust_speed_10_minutes': None, 'maximum_wind_gust_direction_1_hour': None, 'maximum_wind_gust_speed_1_hour': None, 'maximum_wind_gust_direction_3_hours': None, 'maximum_wind_gust_speed_3_hours': None, 'rain_sensor_height': None, 'total_precipitation_1_hour': None, 'total_precipitation_3_hours': None, 'total_precipitation_6_hours': None, 'total_precipitation_12_hours': None, 'total_precipitation_24_hours': None } # Function to populate data_row with query results # Fxn that populates dictionary with values for the csv file def populate_data_row(query, result_info): with connection.cursor() as cursor: cursor.execute(query) result = cursor.fetchall() # Adding the result to the data_row dictionary # Handling query result, given that query result is station related (result_info is station_info) if result: if result_info == "station_info": if result[0][0]: row_item = result[0][0].split(".") data_row['wsi_series'] = row_item[0] data_row['wsi_issuer'] = row_item[1] data_row['wsi_issue_number'] = row_item[2] data_row['wsi_local'] = row_item[3] data_row['station_type'] = 0 # Until further notice data_row['latitude'] = round(result[0][1], 5) data_row['longitude'] = round(result[0][2], 5) data_row['station_height_above_msl'] = result[0][3] data_row['barometer_height_above_msl'] = round(result[0][3] + 1.7, 2) # 1.7 metre above station_height_above_msl data_row['thermometer_height'] = 1.7 data_row['anemometer_height'] = 10 data_row['time_period_of_wind'] = -5 data_row['rain_sensor_height'] = 1.5 elif result_info == "raw_data": # Populate year, month, day, hour, minute in data_row # Get current UTC datetime current_utc_datetime = result[0][2] # Extract year, month, day, hour, and minute year = current_utc_datetime.year month = current_utc_datetime.month day = current_utc_datetime.day hour = current_utc_datetime.hour minute = current_utc_datetime.minute # Add the extracted components to data_row data_row['year'] = year data_row['month'] = month data_row['day'] = day data_row['hour'] = hour data_row['minute'] = minute for row in result: if row[0] == 10: data_row['air_temperature'] = round(row[1] + 273.15, 2) elif row[0] == 19: data_row['dewpoint_temperature'] = round(row[1] + 273.15, 2) elif row[0] == 30: data_row['relative_humidity'] = round(row[1]) elif row[0] == 51: data_row['wind_speed'] = round(row[1], 1) elif row[0] == 60: data_row['station_pressure'] = row[1] * 100 elif row[0] == 61: data_row['msl_pressure'] = row[1] * 100 elif result_info == "precip_hourly_data": precip_hourly = [] for rows in result: precip_hourly.append(rows[0]) data_row['total_precipitation_1_hour'] = round(precip_hourly[0], 1) data_row['total_precipitation_3_hours'] = round(sum(precip_hourly[:3]), 1) data_row['total_precipitation_6_hours'] = round(sum(precip_hourly[:6]), 1) data_row['total_precipitation_12_hours'] = round(sum(precip_hourly[:12]), 1) data_row['total_precipitation_24_hours'] = round(sum(precip_hourly), 1) elif result_info == "wind_hourly_data": wind_hourly = [] for rows in result: wind_hourly.append(rows[0]) data_row['maximum_wind_gust_speed_1_hour'] = round(wind_hourly[0], 1) for value in wind_hourly: if data_row['maximum_wind_gust_speed_3_hours']: if value > data_row['maximum_wind_gust_speed_3_hours']: data_row['maximum_wind_gust_speed_3_hours'] = round(value, 1) else: data_row['maximum_wind_gust_speed_3_hours'] = round(value, 1) # Populate data_row with query1 results query1 = f"SELECT wigos, latitude, longitude, elevation FROM wx_station WHERE id = {id};" populate_data_row(query1, "station_info") # Populate data_row with query2 query2 = f"SELECT variable_id, measured, datetime FROM raw_data rd WHERE station_id = {id} AND datetime = (SELECT MAX(datetime) FROM hourly_summary WHERE station_id = {id}) AND variable_id IN (10, 19, 30, 51, 60, 61);" populate_data_row(query2, "raw_data") # Populate data_row with query3 query3 = f"SELECT sum_value FROM hourly_summary hs WHERE station_id = {id} AND variable_id = 0 ORDER BY datetime desc limit 24" populate_data_row(query3, "precip_hourly_data") # Populate data_row with query4 query4 = f"select max_value from hourly_summary hs where station_id = {id} and variable_id = 53 ORDER BY datetime desc LIMIT 3;" populate_data_row(query4, "wind_hourly_data") # Function to set tag for file based on hour def set_tag(hour, minute): if hour % 6 == 0 and minute == 0: return 'main,ISME01,MZBZ' elif hour % 3 == 0 and minute == 0: return 'intermediate,ISIE01,MZBZ' elif minute == 0: return 'non-standard,ISNE01,MZBZ' else: return 'other' # Now get the tag tag = set_tag(data_row['hour'], data_row['minute']) # CSV writing csv_filename = f"wmo_data_{id}_{tag}.csv" with open(csv_filename, 'w', newline='') as csvfile: csv_writer = csv.DictWriter(csvfile, fieldnames=data_row.keys()) csv_writer.writeheader() csv_writer.writerow(data_row) # Log that data was written to csv file logging.info(f"Data has been written to: {csv_filename} for Station Id: {id}") print(f"Data has been written to: {csv_filename} for Station Id: {id}") # Push CSV to local wis2box try: filepath = f'./wmo_data_{id}_{tag}.csv' minio_path = 'bz-nms/data/core/weather/surface-based-observations/synop' endpoint = '136.156.131.145:9000' WIS2BOX_STORAGE_USERNAME = 'wis2box' WIS2BOX_STORAGE_PASSWORD = 'HhQf6W5n' is_secure = False client = Minio( endpoint=endpoint, access_key=WIS2BOX_STORAGE_USERNAME, secret_key=WIS2BOX_STORAGE_PASSWORD, secure=is_secure) filename = filepath.split('/')[-1] client.fput_object('wis2box-incoming', minio_path+filename, filepath) # Log that transfer to wis2box was succesful logging.info(f"Data transfer to regional Wis2Box successful for Station Id: {id}") print(f"Data transfer to regional Wis2Box successful for Station Id: {id}") except minio.error.S3Error as e: # Log the error if a minio error occurs logging.error(f"regional Wis2Box Connection Error: {e}") '''# Push CSV to regional wis2box try: filepath = f'./wmo_data_{id}.csv' minio_path = 'bz-nms/data/core/weather/surface-based-observations/synop' endpoint = '' WIS2BOX_STORAGE_USERNAME = '' WIS2BOX_STORAGE_PASSWORD = '' is_secure = False client = Minio( endpoint=endpoint, access_key=WIS2BOX_STORAGE_USERNAME, secret_key=WIS2BOX_STORAGE_PASSWORD, secure=is_secure) filename = filepath.split('/')[-1] client.fput_object('wis2box-incoming', minio_path+filename, filepath) # Log that transfer to regional wis2box was succesful logging.info(f"Data transfer to regional wis2box successful for Station Id: {id}") print(f"Data transfer to regional wis2box successful for Station Id: {id}") except minio.error.S3Error as e: # Log the error if a minio error occurs logging.error(f"regional wis2box Connection Error: {e}") ''' except psycopg2.Error as e: # Log the error if a psycopg2 error occurs logging.error(f"Database Connection Error: {e}")